Source code for bitween.components.pubsub.autosubscriber

"""
Democlass for autosubscribing

needed for testing
"""


from .pubsub import Subscriber

[docs]class AutoSub(Subscriber): """ Democlass for autosubscribing needed for testing """ def __init__(self): Subscriber.__init__(self, autosubscribe=True)
[docs] def process_messages(self): """ process messages since this class autosubsribes topics, topic "some_topic" will resolve to "on_some_topic()" :return: """ if self.has_messages(): topic, args, kwargs = self.get_message() try: f = getattr(self, 'on_%s' % topic) f(*args, **kwargs) except Exception as e: print('something went wrong when calling on_%s: %s' % (topic, e))
[docs] def on_some_topic(self, some_string, some_int=1): print('some_string is %s' % some_string) print('some_int is %s' % some_int)