Source code for tornadose.handlers

"""Custom request handlers for pushing data to connected clients."""

import logging
from tornado import gen
from tornado.web import RequestHandler
from tornado.websocket import WebSocketHandler, WebSocketClosedError
from tornado.queues import Queue
from tornado.iostream import StreamClosedError
from tornado.log import access_log
from . import stores

logger = logging.getLogger('tornadose.handlers')


[docs]class BaseHandler(RequestHandler): """Base handler for subscribers. To be compatible with data stores defined in :mod:`tornadose.stores`, custom handlers should inherit this class and implement the :meth:`submit` and :meth:`publish` methods. """ def initialize(self, store): """Common initialization of handlers happens here. If additional initialization is required, this method must either be called with ``super`` or the child class must assign the ``store`` attribute and register itself with the store. """ assert isinstance(store, stores.BaseStore) self.store = store self.store.register(self) def submit(self, message): """Submit a new message to be published. This method must be implemented by child classes. """ raise NotImplementedError('submit must be implemented!') def publish(self): """Push a message to the subscriber. This method must be implemented by child classes. """ raise NotImplementedError('publish must be implemented!')
[docs]class EventSource(BaseHandler): """Handler for server-sent events a.k.a. EventSource. The EventSource__ interface has a few advantages over websockets: * It is a normal HTTP connection and so can be more easily monitored than websockets using tools like curl__ or HTTPie__. * Browsers generally automatically try to reestablish a lost connection. * The publish/subscribe pattern is better suited to some applications than the full duplex model of websockets. __ https://developer.mozilla.org/en-US/docs/Web/API/EventSource __ http://curl.haxx.se/ __ https://github.com/jkbrzt/httpie """
[docs] def initialize(self, store, period=None): """If ``period`` is given, publishers will sleep for approximately the given time in order to throttle data speeds. """ super(EventSource, self).initialize(store) assert isinstance(period, (int, float)) or period is None self.period = period self.finished = False self.set_header('content-type', 'text/event-stream') self.set_header('cache-control', 'no-cache')
def prepare(self): """Log access.""" request_time = 1000.0 * self.request.request_time() access_log.info( "%d %s %.2fms", self.get_status(), self._request_summary(), request_time) @gen.coroutine def submit(self, message): """Receive incoming data.""" logger.debug('Incoming message: ' + message) yield self.publish(message) @gen.coroutine
[docs] def publish(self, message): """Pushes data to a listener.""" try: self.write('data: {}\n\n'.format(message)) yield self.flush() except StreamClosedError: self.finished = True
@gen.coroutine def get(self, *args, **kwargs): try: while not self.finished: if self.period is not None: yield gen.sleep(self.period) else: yield gen.moment except Exception: pass finally: self.store.deregister(self) self.finish()
[docs]class WebSocketSubscriber(BaseHandler, WebSocketHandler): """A Websocket-based subscription handler to be used with :class:`tornadose.stores.QueueStore`. """ def initialize(self, store): super(WebSocketSubscriber, self).initialize(store) self.messages = Queue() self.finished = False @gen.coroutine
[docs] def open(self): """Register with the publisher.""" self.store.register(self) while not self.finished: message = yield self.messages.get() yield self.publish(message)
def on_close(self): self._close() def _close(self): self.store.deregister(self) self.finished = True @gen.coroutine def submit(self, message): yield self.messages.put(message) @gen.coroutine
[docs] def publish(self, message): """Push a new message to the client. The data will be available as a JSON object with the key ``data``. """ try: self.write_message(dict(data=message)) except WebSocketClosedError: self._close()