This document is for py-amqp's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

Source code for amqp.serialization

"""Convert between bytestreams and higher-level AMQP types.

2007-11-05 Barry Pederson <bp@barryp.org>

"""
# Copyright (C) 2007 Barry Pederson <bp@barryp.org>

import calendar
from datetime import datetime
from decimal import Decimal
from io import BytesIO
from struct import pack, unpack_from

from .exceptions import FrameSyntaxError
from .spec import Basic
from .utils import bytes_to_str as pstr_t
from .utils import str_to_bytes

ILLEGAL_TABLE_TYPE = """\
    Table type {0!r} not handled by amqp.
"""

ILLEGAL_TABLE_TYPE_WITH_KEY = """\
Table type {0!r} for key {1!r} not handled by amqp. [value: {2!r}]
"""

ILLEGAL_TABLE_TYPE_WITH_VALUE = """\
    Table type {0!r} not handled by amqp. [value: {1!r}]
"""


def _read_item(buf, offset):
    ftype = chr(buf[offset])
    offset += 1

    # 'S': long string
    if ftype == 'S':
        slen, = unpack_from('>I', buf, offset)
        offset += 4
        try:
            val = pstr_t(buf[offset:offset + slen])
        except UnicodeDecodeError:
            val = buf[offset:offset + slen]

        offset += slen
    # 's': short string
    elif ftype == 's':
        slen, = unpack_from('>B', buf, offset)
        offset += 1
        val = pstr_t(buf[offset:offset + slen])
        offset += slen
    # 'x': Bytes Array
    elif ftype == 'x':
        blen, = unpack_from('>I', buf, offset)
        offset += 4
        val = buf[offset:offset + blen]
        offset += blen
    # 'b': short-short int
    elif ftype == 'b':
        val, = unpack_from('>B', buf, offset)
        offset += 1
    # 'B': short-short unsigned int
    elif ftype == 'B':
        val, = unpack_from('>b', buf, offset)
        offset += 1
    # 'U': short int
    elif ftype == 'U':
        val, = unpack_from('>h', buf, offset)
        offset += 2
    # 'u': short unsigned int
    elif ftype == 'u':
        val, = unpack_from('>H', buf, offset)
        offset += 2
    # 'I': long int
    elif ftype == 'I':
        val, = unpack_from('>i', buf, offset)
        offset += 4
    # 'i': long unsigned int
    elif ftype == 'i':
        val, = unpack_from('>I', buf, offset)
        offset += 4
    # 'L': long long int
    elif ftype == 'L':
        val, = unpack_from('>q', buf, offset)
        offset += 8
    # 'l': long long unsigned int
    elif ftype == 'l':
        val, = unpack_from('>Q', buf, offset)
        offset += 8
    # 'f': float
    elif ftype == 'f':
        val, = unpack_from('>f', buf, offset)
        offset += 4
    # 'd': double
    elif ftype == 'd':
        val, = unpack_from('>d', buf, offset)
        offset += 8
    # 'D': decimal
    elif ftype == 'D':
        d, = unpack_from('>B', buf, offset)
        offset += 1
        n, = unpack_from('>i', buf, offset)
        offset += 4
        val = Decimal(n) / Decimal(10 ** d)
    # 'F': table
    elif ftype == 'F':
        tlen, = unpack_from('>I', buf, offset)
        offset += 4
        limit = offset + tlen
        val = {}
        while offset < limit:
            keylen, = unpack_from('>B', buf, offset)
            offset += 1
            key = pstr_t(buf[offset:offset + keylen])
            offset += keylen
            val[key], offset = _read_item(buf, offset)
    # 'A': array
    elif ftype == 'A':
        alen, = unpack_from('>I', buf, offset)
        offset += 4
        limit = offset + alen
        val = []
        while offset < limit:
            v, offset = _read_item(buf, offset)
            val.append(v)
    # 't' (bool)
    elif ftype == 't':
        val, = unpack_from('>B', buf, offset)
        val = bool(val)
        offset += 1
    # 'T': timestamp
    elif ftype == 'T':
        val, = unpack_from('>Q', buf, offset)
        offset += 8
        val = datetime.utcfromtimestamp(val)
    # 'V': void
    elif ftype == 'V':
        val = None
    else:
        raise FrameSyntaxError(
            'Unknown value in table: {!r} ({!r})'.format(
                ftype, type(ftype)))
    return val, offset


[docs]def loads(format, buf, offset): """Deserialize amqp format. bit = b octet = o short = B long = l long long = L float = f shortstr = s longstr = S table = F array = A timestamp = T """ bitcount = bits = 0 values = [] append = values.append format = pstr_t(format) for p in format: if p == 'b': if not bitcount: bits = ord(buf[offset:offset + 1]) offset += 1 bitcount = 8 val = (bits & 1) == 1 bits >>= 1 bitcount -= 1 elif p == 'o': bitcount = bits = 0 val, = unpack_from('>B', buf, offset) offset += 1 elif p == 'B': bitcount = bits = 0 val, = unpack_from('>H', buf, offset) offset += 2 elif p == 'l': bitcount = bits = 0 val, = unpack_from('>I', buf, offset) offset += 4 elif p == 'L': bitcount = bits = 0 val, = unpack_from('>Q', buf, offset) offset += 8 elif p == 'f': bitcount = bits = 0 val, = unpack_from('>f', buf, offset) offset += 4 elif p == 's': bitcount = bits = 0 slen, = unpack_from('B', buf, offset) offset += 1 val = buf[offset:offset + slen].decode('utf-8', 'surrogatepass') offset += slen elif p == 'S': bitcount = bits = 0 slen, = unpack_from('>I', buf, offset) offset += 4 val = buf[offset:offset + slen].decode('utf-8', 'surrogatepass') offset += slen elif p == 'x': blen, = unpack_from('>I', buf, offset) offset += 4 val = buf[offset:offset + blen] offset += blen elif p == 'F': bitcount = bits = 0 tlen, = unpack_from('>I', buf, offset) offset += 4 limit = offset + tlen val = {} while offset < limit: keylen, = unpack_from('>B', buf, offset) offset += 1 key = pstr_t(buf[offset:offset + keylen]) offset += keylen val[key], offset = _read_item(buf, offset) elif p == 'A': bitcount = bits = 0 alen, = unpack_from('>I', buf, offset) offset += 4 limit = offset + alen val = [] while offset < limit: aval, offset = _read_item(buf, offset) val.append(aval) elif p == 'T': bitcount = bits = 0 val, = unpack_from('>Q', buf, offset) offset += 8 val = datetime.utcfromtimestamp(val) else: raise FrameSyntaxError(ILLEGAL_TABLE_TYPE.format(p)) append(val) return values, offset
def _flushbits(bits, write): if bits: write(pack('B' * len(bits), *bits)) bits[:] = [] return 0
[docs]def dumps(format, values): """Serialize AMQP arguments. Notes: bit = b octet = o short = B long = l long long = L shortstr = s longstr = S byte array = x table = F array = A """ bitcount = 0 bits = [] out = BytesIO() write = out.write format = pstr_t(format) for i, val in enumerate(values): p = format[i] if p == 'b': val = 1 if val else 0 shift = bitcount % 8 if shift == 0: bits.append(0) bits[-1] |= (val << shift) bitcount += 1 elif p == 'o': bitcount = _flushbits(bits, write) write(pack('B', val)) elif p == 'B': bitcount = _flushbits(bits, write) write(pack('>H', int(val))) elif p == 'l': bitcount = _flushbits(bits, write) write(pack('>I', val)) elif p == 'L': bitcount = _flushbits(bits, write) write(pack('>Q', val)) elif p == 'f': bitcount = _flushbits(bits, write) write(pack('>f', val)) elif p == 's': val = val or '' bitcount = _flushbits(bits, write) if isinstance(val, str): val = val.encode('utf-8', 'surrogatepass') write(pack('B', len(val))) write(val) elif p == 'S' or p == 'x': val = val or '' bitcount = _flushbits(bits, write) if isinstance(val, str): val = val.encode('utf-8', 'surrogatepass') write(pack('>I', len(val))) write(val) elif p == 'F': bitcount = _flushbits(bits, write) _write_table(val or {}, write, bits) elif p == 'A': bitcount = _flushbits(bits, write) _write_array(val or [], write, bits) elif p == 'T': write(pack('>Q', int(calendar.timegm(val.utctimetuple())))) _flushbits(bits, write) return out.getvalue()
def _write_table(d, write, bits): out = BytesIO() twrite = out.write for k, v in d.items(): if isinstance(k, str): k = k.encode('utf-8', 'surrogatepass') twrite(pack('B', len(k))) twrite(k) try: _write_item(v, twrite, bits) except ValueError: raise FrameSyntaxError( ILLEGAL_TABLE_TYPE_WITH_KEY.format(type(v), k, v)) table_data = out.getvalue() write(pack('>I', len(table_data))) write(table_data) def _write_array(list_, write, bits): out = BytesIO() awrite = out.write for v in list_: try: _write_item(v, awrite, bits) except ValueError: raise FrameSyntaxError( ILLEGAL_TABLE_TYPE_WITH_VALUE.format(type(v), v)) array_data = out.getvalue() write(pack('>I', len(array_data))) write(array_data) def _write_item(v, write, bits): if isinstance(v, (str, bytes)): if isinstance(v, str): v = v.encode('utf-8', 'surrogatepass') write(pack('>cI', b'S', len(v))) write(v) elif isinstance(v, bool): write(pack('>cB', b't', int(v))) elif isinstance(v, float): write(pack('>cd', b'd', v)) elif isinstance(v, int): if v > 2147483647 or v < -2147483647: write(pack('>cq', b'L', v)) else: write(pack('>ci', b'I', v)) elif isinstance(v, Decimal): sign, digits, exponent = v.as_tuple() v = 0 for d in digits: v = (v * 10) + d if sign: v = -v write(pack('>cBi', b'D', -exponent, v)) elif isinstance(v, datetime): write( pack('>cQ', b'T', int(calendar.timegm(v.utctimetuple())))) elif isinstance(v, dict): write(b'F') _write_table(v, write, bits) elif isinstance(v, (list, tuple)): write(b'A') _write_array(v, write, bits) elif v is None: write(b'V') else: raise ValueError()
[docs]def decode_properties_basic(buf, offset): """Decode basic properties.""" properties = {} flags, = unpack_from('>H', buf, offset) offset += 2 if flags & 0x8000: slen, = unpack_from('>B', buf, offset) offset += 1 properties['content_type'] = pstr_t(buf[offset:offset + slen]) offset += slen if flags & 0x4000: slen, = unpack_from('>B', buf, offset) offset += 1 properties['content_encoding'] = pstr_t(buf[offset:offset + slen]) offset += slen if flags & 0x2000: _f, offset = loads('F', buf, offset) properties['application_headers'], = _f if flags & 0x1000: properties['delivery_mode'], = unpack_from('>B', buf, offset) offset += 1 if flags & 0x0800: properties['priority'], = unpack_from('>B', buf, offset) offset += 1 if flags & 0x0400: slen, = unpack_from('>B', buf, offset) offset += 1 properties['correlation_id'] = pstr_t(buf[offset:offset + slen]) offset += slen if flags & 0x0200: slen, = unpack_from('>B', buf, offset) offset += 1 properties['reply_to'] = pstr_t(buf[offset:offset + slen]) offset += slen if flags & 0x0100: slen, = unpack_from('>B', buf, offset) offset += 1 properties['expiration'] = pstr_t(buf[offset:offset + slen]) offset += slen if flags & 0x0080: slen, = unpack_from('>B', buf, offset) offset += 1 properties['message_id'] = pstr_t(buf[offset:offset + slen]) offset += slen if flags & 0x0040: properties['timestamp'], = unpack_from('>Q', buf, offset) offset += 8 if flags & 0x0020: slen, = unpack_from('>B', buf, offset) offset += 1 properties['type'] = pstr_t(buf[offset:offset + slen]) offset += slen if flags & 0x0010: slen, = unpack_from('>B', buf, offset) offset += 1 properties['user_id'] = pstr_t(buf[offset:offset + slen]) offset += slen if flags & 0x0008: slen, = unpack_from('>B', buf, offset) offset += 1 properties['app_id'] = pstr_t(buf[offset:offset + slen]) offset += slen if flags & 0x0004: slen, = unpack_from('>B', buf, offset) offset += 1 properties['cluster_id'] = pstr_t(buf[offset:offset + slen]) offset += slen return properties, offset
PROPERTY_CLASSES = { Basic.CLASS_ID: decode_properties_basic, }
[docs]class GenericContent: """Abstract base class for AMQP content. Subclasses should override the PROPERTIES attribute. """ CLASS_ID = None PROPERTIES = [('dummy', 's')] def __init__(self, frame_method=None, frame_args=None, **props): self.frame_method = frame_method self.frame_args = frame_args self.properties = props self._pending_chunks = [] self.body_received = 0 self.body_size = 0 self.ready = False __slots__ = ( "frame_method", "frame_args", "properties", "_pending_chunks", "body_received", "body_size", "ready", # adding '__dict__' to get dynamic assignment "__dict__", "__weakref__", ) def __getattr__(self, name): # Look for additional properties in the 'properties' # dictionary, and if present - the 'delivery_info' dictionary. if name == '__setstate__': # Allows pickling/unpickling to work raise AttributeError('__setstate__') if name in self.properties: return self.properties[name] raise AttributeError(name) def _load_properties(self, class_id, buf, offset): """Load AMQP properties. Given the raw bytes containing the property-flags and property-list from a content-frame-header, parse and insert into a dictionary stored in this object as an attribute named 'properties'. """ # Read 16-bit shorts until we get one with a low bit set to zero props, offset = PROPERTY_CLASSES[class_id](buf, offset) self.properties = props return offset def _serialize_properties(self): """Serialize AMQP properties. Serialize the 'properties' attribute (a dictionary) into the raw bytes making up a set of property flags and a property list, suitable for putting into a content frame header. """ shift = 15 flag_bits = 0 flags = [] sformat, svalues = [], [] props = self.properties for key, proptype in self.PROPERTIES: val = props.get(key, None) if val is not None: if shift == 0: flags.append(flag_bits) flag_bits = 0 shift = 15 flag_bits |= (1 << shift) if proptype != 'bit': sformat.append(str_to_bytes(proptype)) svalues.append(val) shift -= 1 flags.append(flag_bits) result = BytesIO() write = result.write for flag_bits in flags: write(pack('>H', flag_bits)) write(dumps(b''.join(sformat), svalues)) return result.getvalue()
[docs] def inbound_header(self, buf, offset=0): class_id, self.body_size = unpack_from('>HxxQ', buf, offset) offset += 12 self._load_properties(class_id, buf, offset) if not self.body_size: self.ready = True return offset
[docs] def inbound_body(self, buf): chunks = self._pending_chunks self.body_received += len(buf) if self.body_received >= self.body_size: if chunks: chunks.append(buf) self.body = bytes().join(chunks) chunks[:] = [] else: self.body = buf self.ready = True else: chunks.append(buf)