Source code for pubsub.core.listener

"""
Top-level functionality related to message listeners.

:copyright: Copyright since 2006 by Oliver Schoenborn, all rights reserved.
:license: BSD, see LICENSE_BSD_Simple.txt for details.
"""

from types import ModuleType
from typing import Callable, Mapping, Any, Sequence

from .callables import (
    getID,
    getArgs,
    ListenerMismatchError,
    CallArgsInfo,
    AUTO_TOPIC as _AUTO_ARG,
    UserListener,
)
from .weakmethod import getWeakRef, WeakRef
from .annotations import annotationType

__all__ = [
    'Listener',
    'IListenerExcHandler',
    'ListenerValidator'
]


@annotationType
class Topic:
    pass


@annotationType
class Listener:
    """Wrapper of a UserListener"""
    pass


[docs]class IListenerExcHandler: """ Interface class base class for any handler given to pub.setListenerExcHandler() Such handler is called whenever a listener raises an exception during a pub.sendMessage(). Example:: from pubsub import pub class MyHandler(pub.IListenerExcHandler): def __call__(self, listenerID, topicObj): ... do something with listenerID ... pub.setListenerExcHandler(MyHandler()) Without an exception handler, the sendMessage() will fail. """ def __call__(self, listenerID: str, topicObj: Topic): raise NotImplementedError('%s must override __call__()' % self.__class__)
[docs]class Listener: """ Wraps a callable (UserListener) so it can be stored by weak reference and introspected to verify that it adheres to a topic's MDS. A Listener instance has the same hash value as the callable that it wraps. Callables that have 'argName=pub.AUTO_TOPIC' as a kwarg will be given the Topic object for the message sent by sendMessage(). Such a Listener will have wantsTopicObjOnCall() True. Callables that have a '** kargs' argument will receive all message data, not just that for the topic they are subscribed to. Such a listener will have wantsAllMessageData() True. """ AUTO_TOPIC = _AUTO_ARG def __init__(self, callable_obj: UserListener, argsInfo: CallArgsInfo, curriedArgs: Mapping[str, Any] = None, onDead: Callable[[Listener], None] = None): """ Use callable_obj as a listener of topicName. The argsInfo is the return value from a Validator, ie an instance of callables.CallArgsInfo. If given, the onDead will be called with self as parameter, if/when callable_obj gets garbage collected (callable_obj is held only by weak reference). """ # set call policies self.acceptsAllKwargs = argsInfo.acceptsAllKwargs self.curriedArgs = curriedArgs self._autoTopicArgName = argsInfo.autoTopicArgName self._callable = getWeakRef(callable_obj, self.__notifyOnDead) self.__onDead = onDead # save identity now in case callable dies: name, mod = getID(callable_obj) # self.__nameID = name self.__module = mod self.__id = str(id(callable_obj))[-4:] # only last four digits of id self.__hash = hash(callable_obj)
[docs] def name(self) -> str: """ Return a human readable name for listener, based on the listener's type name and its id (as obtained from id(listener)). If caller just needs name based on type info, specify instance=False. Note that the listener's id() was saved at construction time (since it may get garbage collected at any time) so the return value of name() is not necessarily unique if the callable has died (because id's can be re-used after garbage collection). """ return '%s_%s' % (self.__nameID, self.__id)
[docs] def typeName(self) -> str: """ Get a type name for the listener. This is a class name or function name, as appropriate. """ return self.__nameID
[docs] def module(self) -> ModuleType: """ Get the module in which the callable was defined. """ return self.__module
[docs] def getCallable(self) -> UserListener: """ Get the listener that was given at initialization. Note that this could be None if it has been garbage collected (e.g. if it was created as a wrapper of some other callable, and not stored locally). """ return self._callable()
[docs] def isDead(self) -> bool: """Return True if this listener died (has been garbage collected)""" return self._callable() is None
[docs] def wantsTopicObjOnCall(self) -> bool: """True if this listener wants topic object: it has a arg=pub.AUTO_TOPIC""" return self._autoTopicArgName is not None
[docs] def wantsAllMessageData(self) -> bool: """True if this listener wants all message data: it has a ** kwargs argument""" return self.acceptsAllKwargs
[docs] def setCurriedArgs(self, **curriedArgs): """ Curry the wrapped listener so it appears to *not* have list(curriedArgs) among its parameters. The curriedArgs key-value pairs will be given to wrapped listener at call time. """ if curriedArgs.keys() != self.curriedArgs.keys(): raise ValueError( "Listener '{}' already subscribed with a different set of pure curried args ({} != {})" .format(self, curriedArgs.keys(), self.curriedArgs.keys())) self.curriedArgs = curriedArgs
def _unlinkFromTopic_(self): """Tell self that it is no longer used by a Topic. This allows to break some cyclical references.""" self.__onDead = None def _calledWhenDead(self): raise RuntimeError('BUG: Dead Listener called, still subscribed!') def __notifyOnDead(self, _: WeakRef): """This gets called when listener weak ref has died. Propagate info to Topic.""" notifyDeath = self.__onDead self._unlinkFromTopic_() if notifyDeath is not None: notifyDeath(self) def __eq__(self, rhs: Listener): """ Compare for equality to rhs. This returns true if rhs has our id id(rhs) is same as id(self) or id(callable in self). """ if id(self) == id(rhs): return True c1 = self._callable() try: c2 = rhs._callable() except Exception: # then rhs is not a Listener, compare with c1 return c1 == rhs # both side of == are Listener, but always compare unequal if both dead if c2 is None and c1 is None: return False return c1 == c2 def __ne__(self, rhs: Listener): """Counterpart to __eq__ MUST be defined... equivalent to 'not (self == rhs)'.""" return not self.__eq__(rhs) def __hash__(self): """ Hash is an optimization for dict/set searches, it need not return different numbers for every different object. """ return self.__hash def __str__(self): """String rep is the callable""" return self.__nameID def __call__(self, kwargs: Mapping[str, Any], actualTopic: Topic, allKwargs: Mapping[str, Any] = None): """ Call the listener with **kwargs. Note that it raises RuntimeError if listener is dead. Should always return True (False would require the callable_obj be dead but self hasn't yet been notified of it...). """ if self.acceptsAllKwargs: kwargs = allKwargs or kwargs # if allKwargs is None then use kwargs orig_kwargs = kwargs # combine with curried args; Note: this overrides topic arg if present: if self.curriedArgs: if kwargs: kwargs = kwargs.copy() kwargs.update(self.curriedArgs) else: kwargs = self.curriedArgs if self._autoTopicArgName is not None: if kwargs is orig_kwargs: kwargs = kwargs.copy() kwargs[self._autoTopicArgName] = actualTopic # call: cb = self._callable() if cb is None: self._calledWhenDead() cb(**kwargs) return True
class ListenerValidator: """ Validates listeners. It checks whether the listener given to validate() method complies with required and optional arguments specified for topic. Do not accept any required args or *args; accept any **kwarg, and require that the Listener have at least all the kwargs (can have extra) of Topic. """ def __init__(self, topicArgs: Sequence[str], topicKwargs: Sequence[str]): """ :param topicArgs: list of argument names that will be required when sending a message to listener. Hence order of items in topicArgs matters. :param topicKwargs: list of argument names that will be optional, ie given as keyword arguments when sending a message to listener. The list is unordered. """ self._topicArgs = set(topicArgs) self._topicKwargs = set(topicKwargs) def validate(self, listener: UserListener, curriedArgNames: Sequence[str] = None) -> CallArgsInfo: """ Validate that listener (with, optionally, given curried parameters) satisfies the requirements of being a topic listener. :param listener: the callable to validate :param curriedArgNames: the list of parameter names to treat as curried :returns: a CallArgsInfo object containing information about the listener's call arguments, such as whether listener wants topic name (signified by a kwarg value = AUTO_TOPIC in listener signature). :raises ListenerMismatchError: if listener not usable for topic """ paramsInfo = getArgs(listener) self.__validateArgs(listener, paramsInfo, curriedArgNames) return paramsInfo # noinspection PyIncorrectDocstring def isValid(self, listener: UserListener, curriedArgNames: Sequence[str] = None) -> bool: """Same as validate() but returns True/False instead of raising an exception.""" try: self.validate(listener, curriedArgNames=curriedArgNames) return True except ListenerMismatchError: return False def __validateArgs(self, listener: UserListener, paramsInfo: CallArgsInfo, curriedArgNames: Sequence[str]): # accept **kwargs # accept *args # check if listener missing params (only possible if # paramsInfo.acceptsAllKwargs is False) if not paramsInfo.acceptsAllKwargs: allTopicMsgArgs = self._topicArgs | self._topicKwargs allParams = set(paramsInfo.allParams) missingParams = allTopicMsgArgs - allParams if missingParams: msg = 'needs to accept %s more args (%s)' \ % (len(missingParams), ', '.join(missingParams)) raise ListenerMismatchError(msg, listener, missingParams) else: # then can accept that some parameters missing from listener # signature pass if curriedArgNames: unrecognizedCurried = set(curriedArgNames).difference(paramsInfo.allParams) if unrecognizedCurried: msg = 'does not have following args: (%s)' % ', '.join(unrecognizedCurried) raise ListenerMismatchError(msg, listener, unrecognizedCurried) curriedTopicArgs = set(curriedArgNames).intersection(self._topicArgs | self._topicKwargs) if curriedTopicArgs: msg = 'curried args (%s) are topic args, not allowed' % ', '.join(curriedTopicArgs) raise ListenerMismatchError(msg, listener, curriedTopicArgs) # check if there are extra required parameters in listener signature: extraArgs = set(paramsInfo.getRequiredArgs()) - self._topicArgs if extraArgs and curriedArgNames: extraArgs = extraArgs.difference(curriedArgNames) if extraArgs: msg = 'required args (%s) not allowed (could curry them), ' % ','.join(extraArgs) if self._topicArgs: msg += 'topic req\'d args are (%s)' % ', '.join(self._topicArgs) else: msg += 'topic has no required args' # now make sure listener doesn't require params that are optional in TMS: missingDefaultVals = extraArgs.intersection(self._topicKwargs) if missingDefaultVals: msg += ' (params (%s) are req\'d in listener, optional in topic )' % ', '.join(missingDefaultVals) raise ListenerMismatchError(msg, listener, extraArgs)