Source code for pubsub.core.topicargspec

"""
Definitions related to message data specification.

:copyright: Copyright since 2006 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE_BSD_Simple.txt for details.
"""

import weakref
from typing import Tuple, List, Sequence as Seq, Mapping, Dict, Callable, Any, Optional, Union

from .topicutils import stringize, WeakNone
from .annotations import annotationType
from .topicexc import MessageDataSpecError
from .listener import getArgs as getListenerArgs, UserListener


ArgsDocs = Dict[str, str]
MsgData = Mapping[str, Any]


def verifyArgsDifferent(allArgs, allParentArgs, topicName):
    """
    Verify that allArgs does not contain any of allParentArgs. Raise
    MessageDataSpecError if fail.
    """
    extra = set(allArgs).intersection(allParentArgs)
    if extra:
        msg = 'Args %%s already used in parent of "%s"' % topicName
        raise MessageDataSpecError(msg, tuple(extra))


def verifySubset(all, sub, topicName, extraMsg=''):
    """
    Verify that sub is a subset of all for topicName. Raise
    MessageDataSpecError if fail.
    """
    notInAll = set(sub).difference(all)
    if notInAll:
        args = ','.join(all)
        msg = 'Params [%s] missing inherited [%%s] for topic "%s"%s' % (args, topicName, extraMsg)
        raise MessageDataSpecError(msg, tuple(notInAll))


def topicArgsFromCallable(_callable: UserListener, ignoreArgs: Seq[str] = ()) -> Tuple[ArgsDocs, List[str]]:
    """
    Get the topic message data names and list of those that are required,
    by introspecting given callable. Returns a pair, (args, required)
    where args is a dictionary of allowed message data names vs docstring,
    and required states which ones are required rather than optional.
    """
    argsInfo = getListenerArgs(_callable, ignoreArgs=ignoreArgs)
    required = argsInfo.getRequiredArgs()
    defaultDoc = 'UNDOCUMENTED'
    args = dict.fromkeys(argsInfo.allParams, defaultDoc)
    return args, required


class ArgSpecGiven:
    """
    The message data specification (MDS) for a topic.

    This consists of each argument name that listener should have in its
    signature, plus which ones are required in any sendMessage(), and a
    documentation string for each argument. This instance will be transformed
    into an ArgsInfo object which is basically a superset of that information,
    needed to ensure that the arguments specifications satisfy
    pubsub policies for chosen API version.
    """

    SPEC_GIVEN_NONE = 1  # specification not given
    SPEC_GIVEN_ALL = 3  # all args specified

    def __init__(self, argsDocs: ArgsDocs = None, reqdArgs: Seq[str] = None):
        self.reqdArgs = tuple(reqdArgs or ())

        if argsDocs is None:
            self.argsSpecType = ArgSpecGiven.SPEC_GIVEN_NONE
            self.argsDocs = {}
        else:
            self.argsSpecType = ArgSpecGiven.SPEC_GIVEN_ALL
            self.argsDocs = argsDocs

            # check that all args marked as required are in argsDocs
            missingArgs = set(self.reqdArgs).difference(self.argsDocs.keys())  # py3: iter keys ok
            if missingArgs:
                msg = 'Params [%s] missing inherited required args [%%s]' % ','.join(argsDocs.keys())  # iter keys ok
                raise MessageDataSpecError(msg, missingArgs)

    def setAll(self, allArgsDocs: ArgsDocs, reqdArgs: Seq[str] = None):
        self.argsDocs = allArgsDocs
        self.reqdArgs = reqdArgs or ()
        self.argsSpecType = ArgSpecGiven.SPEC_GIVEN_ALL

    def isComplete(self) -> bool:
        """Returns True if the definition is usable, false otherwise."""
        return self.argsSpecType == ArgSpecGiven.SPEC_GIVEN_ALL

    def getOptional(self) -> List[str]:
        """Get the list of optional arguments"""
        return tuple(set(self.argsDocs.keys()).difference(self.reqdArgs))

    def __str__(self):
        return "%s, %s, %s" % (self.argsDocs, self.reqdArgs, self.argsSpecType)


[docs]class SenderMissingReqdMsgDataError(RuntimeError): """ Raised when a sendMessage() is missing arguments tagged as 'required' by pubsub topic of message. """ def __init__(self, topicName: str, argNames: Seq[str], missing: Seq[str]): argsStr = ','.join(argNames) missStr = ','.join(missing) msg = "Some required args missing in call to sendMessage('%s', %s): %s" \ % (stringize(topicName), argsStr, missStr) RuntimeError.__init__(self, msg)
[docs]class SenderUnknownMsgDataError(RuntimeError): """ Raised when a sendMessage() has arguments not listed among the topic's message data specification (MDS). """ def __init__(self, topicName: str, argNames: Seq[str], extra: Seq[str]): argsStr = ','.join(argNames) extraStr = ','.join(extra) msg = "Some optional args unknown in call to sendMessage('%s', %s): %s" \ % (topicName, argsStr, extraStr) RuntimeError.__init__(self, msg)
@annotationType class ArgsInfo: pass class ArgsInfo: """ Encode the Message Data Specification (MDS) for a given topic. ArgsInfos form a tree identical to that of Topics in that ArgInfos have a reference to their parent and children ArgInfos, created for the parent and children topics. The only difference between an ArgsInfo and an ArgSpecGiven is that the latter is what "user thinks is ok" whereas former has been validated: the specification for this topic is a strict superset of the specification of its parent, and a strict subset of the specification of each of its children. Also, the instance can be used to check validity and filter arguments. The MDS can be created "empty", ie "incomplete", meaning it cannot yet be used to validate listener subscriptions to topics. """ SPEC_MISSING = 10 # no args given SPEC_COMPLETE = 12 # all args, but not confirmed via user spec def __init__(self, topicNameTuple: Seq[str], specGiven: ArgSpecGiven, parentArgsInfo: ArgsInfo): self.topicNameTuple = topicNameTuple self.allOptional = () # topic message optional arg names self.allDocs = {} # doc for each arg self.allRequired = () # topic message required arg names self.argsSpecType = self.SPEC_MISSING self.parentAI = WeakNone() if parentArgsInfo is not None: self.parentAI = weakref.ref(parentArgsInfo) parentArgsInfo.__addChildAI(self) self.childrenAI = [] if specGiven.isComplete(): self.__setAllArgs(specGiven) if parentArgsInfo is None: assert self.argsAddedToParent is not None else: while not parentArgsInfo.isComplete(): parentArgsInfo = parentArgsInfo.parentAI() self.argsAddedToParent = set(self.getArgs()).difference(parentArgsInfo.getArgs()) def isComplete(self) -> bool: return self.argsSpecType == self.SPEC_COMPLETE def getArgs(self) -> List[str]: return self.allOptional + self.allRequired def numArgs(self) -> int: return len(self.allOptional) + len(self.allRequired) def getReqdArgs(self) -> List[str]: return self.allRequired def getOptArgs(self) -> List[str]: return self.allOptional def getArgsDocs(self) -> ArgsDocs: return self.allDocs.copy() def setArgsDocs(self, docs: ArgsDocs): """docs is a mapping from arg names to their documentation""" if not self.isComplete(): raise RuntimeError('Topic MDS is not complete, cannot set docs!') for arg, doc in docs.items(): self.allDocs[arg] = doc def check(self, msgData: MsgData): """ Check that the message arguments given satisfy the topic message data specification (MDS). :param msgData: the topic message data to check for validity :raise SenderMissingReqdMsgDataError: if some required args are missing or not known :raise SenderUnknownMsgDataError: if some optional args are unknown. """ all = set(msgData) # check that it has all required args needReqd = set(self.allRequired) hasReqd = (needReqd <= all) if not hasReqd: raise SenderMissingReqdMsgDataError( self.topicNameTuple, list(msgData.keys()), needReqd - all) # check that all other args are among the optional spec optional = all - needReqd ok = (optional <= set(self.allOptional)) if not ok: raise SenderUnknownMsgDataError(self.topicNameTuple, list(msgData.keys()), optional - set(self.allOptional)) def filterArgs(self, msgData: MsgData) -> MsgData: """ Returns a dict which contains only those items of msgData which are defined for topic. E.g. if msgData is {a:1, b:'b'} and topic arg spec is ('a',) then return {a:1}. The returned dict is valid only if check(msgData) was called (or check(superset of msgData) was called). :param msgData: the topic message data to filter """ assert self.isComplete() if len(msgData) == self.numArgs(): return msgData # only keep the keys from msgData that are also in topic's kwargs # method 1: SLOWEST # newKwargs = dict( (k,msgData[k]) for k in self.__msgArgs.allOptional if k in msgData ) # newKwargs.update( (k,msgData[k]) for k in self.__msgArgs.allRequired ) # method 2: FAST: # argNames = self.__msgArgs.getArgs() # newKwargs = dict( (key, val) for (key, val) in msgData.iteritems() if key in argNames ) # method 3: FASTEST: argNames = set(self.getArgs()).intersection(msgData) newKwargs = dict((k, msgData[k]) for k in argNames) return newKwargs def hasSameArgs(self, *argNames: Seq[str]) -> bool: """ Returns true if self has all the message arguments given, no more and no less. Order does not matter. So if getArgs() returns ('arg1', 'arg2') then self.hasSameArgs('arg2', 'arg1') will return true. """ return set(argNames) == set(self.getArgs()) def hasParent(self, argsInfo: ArgsInfo) -> bool: """return True if self has argsInfo object as parent""" return self.parentAI() is argsInfo def getCompleteAI(self) -> ArgsInfo: """ Get the closest arg spec, starting from self and moving to parent, that is complete. So if self.isComplete() is True, then returns self, otherwise returns parent (if parent.isComplete()), etc. """ AI = self while AI is not None: if AI.isComplete(): return AI AI = AI.parentAI() # dereference weakref return None def updateAllArgsFinal(self, topicDefn: ArgSpecGiven): """ This can only be called once, if the construction was done with ArgSpecGiven.SPEC_GIVEN_NONE """ assert not self.isComplete() assert topicDefn.isComplete() self.__setAllArgs(topicDefn) def __addChildAI(self, childAI: ArgsInfo): assert childAI not in self.childrenAI self.childrenAI.append(childAI) def __notifyParentCompleted(self): """Parent should call this when parent ArgsInfo has been completed""" assert self.parentAI().isComplete() if self.isComplete(): # verify that our spec is compatible with parent's self.__validateArgsToParent() self.argsAddedToParent = set(self.getArgs()).difference(self.parentAI().getArgs()) else: for argsInfo in self.childrenAI: argsInfo.__notifyAncestorCompleted(self.parentAI()) def __notifyAncestorCompleted(self, parentAI): if self.isComplete(): # verify that our spec is compatible with parent's self.__validateArgsToParent() self.argsAddedToParent = set(self.getArgs()).difference(parentAI.getArgs()) else: for argsInfo in self.childrenAI: argsInfo.__notifyAncestorCompleted(parentAI) def __validateArgsToParent(self): # validate relative to parent arg spec closestParentAI = self.parentAI().getCompleteAI() if closestParentAI is not None: # verify that parent args is a subset of spec given: topicName = stringize(self.topicNameTuple) verifySubset(self.getArgs(), closestParentAI.getArgs(), topicName) verifySubset(self.allRequired, closestParentAI.getReqdArgs(), topicName, ' required args') def __setAllArgs(self, specGiven: ArgSpecGiven): assert specGiven.isComplete() self.allOptional = tuple(specGiven.getOptional()) self.allRequired = specGiven.reqdArgs self.allDocs = specGiven.argsDocs.copy() # doc for each arg self.argsSpecType = self.SPEC_COMPLETE parentArgsInfo = self.parentAI() if parentArgsInfo is None: self.argsAddedToParent = [] else: self.__validateArgsToParent() while not parentArgsInfo.isComplete(): parentArgsInfo = parentArgsInfo.parentAI() self.argsAddedToParent = set(self.getArgs()).difference(parentArgsInfo.getArgs()) # notify our children for childAI in self.childrenAI: childAI.__notifyParentCompleted()