from . import logger
from threading import Lock
import sys
from types import FunctionType
if sys.version_info < (3, 0):
import Queue as queue
from Queue import Empty
else:
import queue
from queue import Empty
topics = {}
lock = Lock()
[docs]def publish(topic, *args, **kwargs):
"""
publish arguments to a topic.
:param topic: topic to publish the message to
:type topic: str
:param args: unnamed arguments
:param kwargs: named arguments
:return: False if no Subscribers, else True
"""
t = _get_topic(topic)
logger.debug('got subscribers in topic: %s' % t['subscribers'])
if not t['subscribers']:
logger.error('published to topic %s with no subscribers' % topic)
return False
with lock:
for s in t['subscribers']:
logger.debug('published message on topic %s: %s %s' % (topic, args, kwargs))
s._put_message((topic, args, kwargs))
return True
def _get_topic(topic):
"""
get a topic from the topics dictionary
will be created new, with no subscribers, if there is none.
:param topic:
:return:
"""
t = topics.get(topic, None)
if not t:
topics[topic] = _new_topic()
logger.debug('new topic %s' % (topic))
t = topics[topic]
return t
def _new_topic():
"""
create an empty dictionary with subscribers. used by _get_topic()
:return: empty subscribers dict
"""
return {
'subscribers': []
}
[docs]class Subscriber:
def __init__(self, name='', autosubscribe=False):
"""
Base Class for IPC
all Subclasses inherit Queues and basic scheduling functions
see the __init__.py of this module for example usage.
:param name: Optional, but used in debugging
:param autosubscribe: if True, subscribe to all functionnames starting with on_, without on_ ("on_topic()" would subscribe to "topic")
:return:
"""
self.queue = queue.Queue()
self.name = name
if autosubscribe:
listen_to = [x for x, y in self.__class__.__dict__.items() if
(type(y) == FunctionType and x.startswith('on_'))]
for l in listen_to:
self.subscribe(l.split('on_')[1])
[docs] def subscribe(self, topic):
"""
manually subscribe a topic
:param topic:
:return:
"""
t = _get_topic(topic)
with lock:
if self not in t['subscribers']:
logger.info('%s subscribed to %s' % (self.name, topic))
t['subscribers'].append(self)
return True
else:
logger.error('already subscribed to %s' % topic)
return False
def _put_message(self, topic, *args, **kwargs):
"""
put a Message in the Queue.
called by the publish-function
:param topic:
:param args:
:param kwargs:
:return:
"""
self.queue.put(topic, *args, **kwargs)
[docs] def has_messages(self):
"""
:return: True if Queue not Empty, otherwise False
"""
return self.queue.qsize() != 0
[docs] def get_message(self, timeout=0.1):
"""
get topic, arguments and names arguments
:param timeout:
:return: topic, args, kwargs
:rtype: str, list, dict
"""
try:
(topic, args, kwargs) = self.queue.get(block=True, timeout=timeout)
return topic, args, kwargs
except Empty:
return False
[docs] def publish(self, topic, *args, **kwargs):
"""
just for nicer logging output. calls the regular publish via this method
:param topic:
:param args:
:param kwargs:
:return:
"""
logger.info('%s is publishing on topic %s: %s, %s' % (self.name, topic, args, kwargs))
ret = publish(topic, *args, **kwargs)
if not ret:
logger.error('error at %s' % self)
def __del__(self):
with lock:
if topics:
for t in topics:
if self in t['subscribers']:
t['subscribers'].remove(self)
logger.debug('removed self from %s while deleting' % t)
def __repr__(self):
return self.name