"""Code common to Connection and Channel objects."""
# Copyright (C) 2007-2008 Barry Pederson <>)
from __future__ import absolute_import, unicode_literals

from vine import ensure_promise, promise

from .exceptions import AMQPNotImplementedError, RecoverableConnectionError
from .five import bytes_if_py2
from .serialization import dumps, loads

__all__ = ['AbstractChannel']

[docs]class AbstractChannel(object): """Superclass for Connection and Channel. The connection is treated as channel 0, then comes user-created channel objects. The subclasses must have a _METHOD_MAP class property, mapping between AMQP method signatures and Python methods. """ def __init__(self, connection, channel_id): self.is_closing = False self.connection = connection self.channel_id = channel_id connection.channels[channel_id] = self self.method_queue = [] # Higher level queue for methods self.auto_decode = False self._pending = {} self._callbacks = {} self._setup_listeners() def __enter__(self): return self def __exit__(self, *exc_info): self.close()
[docs] def send_method(self, sig, format=None, args=None, content=None, wait=None, callback=None, returns_tuple=False): p = promise() conn = self.connection if conn is None: raise RecoverableConnectionError('connection already closed') args = dumps(format, args) if format else bytes_if_py2('') try: conn.frame_writer(1, self.channel_id, sig, args, content) except StopIteration: raise RecoverableConnectionError('connection already closed') # TODO temp: callback should be after write_method ... ;) if callback: p.then(callback) p() if wait: return self.wait(wait, returns_tuple=returns_tuple) return p
[docs] def close(self): """Close this Channel or Connection.""" raise NotImplementedError('Must be overriden in subclass')
[docs] def wait(self, method, callback=None, timeout=None, returns_tuple=False): p = ensure_promise(callback) pending = self._pending prev_p = [] if not isinstance(method, list): method = [method] for m in method: prev_p.append(pending.get(m)) pending[m] = p try: while not p.ready: self.connection.drain_events(timeout=timeout) if p.value: args, kwargs = p.value args = args[1:] # We are not returning method back return args if returns_tuple else (args and args[0]) finally: for i, m in enumerate(method): if prev_p[i] is not None: pending[m] = prev_p[i] else: pending.pop(m, None)
[docs] def dispatch_method(self, method_sig, payload, content): if content and \ self.auto_decode and \ hasattr(content, 'content_encoding'): try: content.body = content.body.decode(content.content_encoding) except Exception: pass try: amqp_method = self._METHODS[method_sig] except KeyError: raise AMQPNotImplementedError( 'Unknown AMQP method {0!r}'.format(method_sig)) try: listeners = [self._callbacks[method_sig]] except KeyError: listeners = [] one_shot = None try: one_shot = self._pending.pop(method_sig) except KeyError: if not listeners: return args = [] if amqp_method.args: args, _ = loads(amqp_method.args, payload, 4) if amqp_method.content: args.append(content) for listener in listeners: listener(*args) if one_shot: one_shot(method_sig, *args)
#: Placeholder, the concrete implementations will have to #: supply their own versions of _METHOD_MAP _METHODS = {}