Source code for tornadose.stores

"""Data storage for dynamic updates to clients."""

import logging
from tornado import gen
from tornado.web import RequestHandler
from tornado.queues import Queue
from tornado.locks import Event

logger = logging.getLogger('tornadose.stores')


[docs]class BaseStore(object): """Base class for all data store types. At a minimum, derived classes should implement ``submit`` and ``publish`` methods. """ def __init__(self, *args, **kwargs): self.subscribers = set() self.initialize(*args, **kwargs)
[docs] def initialize(self, *args, **kwargs): """Hook for doing custom initialization. Child classes should implement this method instead of overwriting ``__init__``. """
[docs] def register(self, subscriber): """Register a new subscriber. This method should be invoked by listeners to start receiving messages. """ assert isinstance(subscriber, RequestHandler) logger.debug('New subscriber') self.subscribers.add(subscriber)
[docs] def deregister(self, subscriber): """Stop publishing to a subscriber.""" try: logger.debug('Subscriber left') self.subscribers.remove(subscriber) except KeyError: logger.debug( 'Error removing subscriber: ' + str(subscriber))
[docs] def submit(self, message): """Add a new message to be pushed to subscribers. This method must be implemented by child classes. This method exists to store new data. To actually publish the data, implement the ``publish`` method. """ raise NotImplementedError('submit must be implemented!')
[docs] def publish(self): """Push messages to all listeners. This method must be implemented by child classes. A recommended way to implement this method is as a looping coroutine which yields until new data is available via the :meth:`submit` method. """ raise NotImplementedError('publish must be implemented!')
[docs]class DataStore(BaseStore): """Generic object for producing data to feed to clients. To use this, simply instantiate and update the ``data`` property whenever new data is available. When creating a new :class:`EventSource` handler, specify the :class:`DataStore` instance so that the :class:`EventSource` can listen for updates. """ def initialize(self, initial_data=None): self.ready = Event() self.set_data(initial_data) self.publish() def set_data(self, new_data): """Update the store with new data.""" self._data = new_data self.ready.set() @property def data(self): return self._data @data.setter def data(self, new_data): self.set_data(new_data) def submit(self, message): self.data = str(message) @gen.coroutine def publish(self): while True: yield self.ready.wait() yield [subscriber.submit(self.data) for subscriber in self.subscribers] self.ready.clear()
[docs]class QueueStore(BaseStore): """Publish data via queues. This class is meant to be used in cases where subscribers should not miss any data. Compared to the :class:`DataStore` class, new messages to be broadcast to clients are put in a queue to be processed in order. :class:`QueueStore` will work with any :class:`tornado.web.RequestHandler` subclasses which implement a ``submit`` method. It is recommended that a custom subscription handler's :meth:`submit` method also utilize a queue to avoid losing data. The subscriber must also register/deregister itself with the :class:`QueueStore` via the :meth:`QueueStore.register` and :meth:`QueueStore.deregister` methods. A :class:`QueueStore`-compatible request handler is included in :class:`tornadose.handlers.WebSocketSubscriber`. """ def initialize(self): self.messages = Queue() self.publish() @gen.coroutine def submit(self, message): yield self.messages.put(message) @gen.coroutine def publish(self): while True: message = yield self.messages.get() if len(self.subscribers) > 0: print("Pushing message {} to {} subscribers...".format( message, len(self.subscribers))) yield [subscriber.submit(message) for subscriber in self.subscribers]