http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d28fecf5/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/python/proton/__init__.py b/python/proton/__init__.py index 6ee0d68..be8e247 100644 --- a/python/proton/__init__.py +++ b/python/proton/__init__.py @@ -30,54 +30,84 @@ The proton APIs consist of the following classes: """ from __future__ import absolute_import -from cproton import * -from .wrapper import Wrapper -from . import _compat - import logging -import socket -import sys -import threading -import uuid -import weakref - -# This private NullHandler is required for Python 2.6, -# when we no longer support 2.6 this replace NullHandler class definition and assignment with: -# handler = logging.NullHandler() -class NullHandler(logging.Handler): - def handle(self, record): - pass - - def emit(self, record): - pass - - def createLock(self): - self.lock = None - -handler = NullHandler() - -log = logging.getLogger("proton") -log.addHandler(handler) -def generate_uuid(): - return uuid.uuid4() +from cproton import PN_VERSION_MAJOR, PN_VERSION_MINOR, PN_VERSION_POINT -# -# Hacks to provide Python2 <---> Python3 compatibility -# -# The results are -# | |long|unicode| -# |python2|long|unicode| -# |python3| int| str| -try: - long() -except NameError: - long = int -try: - unicode() -except NameError: - unicode = str +from ._condition import Condition +from ._data import UNDESCRIBED, Array, Data, Described, char, symbol, timestamp, ubyte, ushort, uint, ulong, \ + byte, short, int32, float32, decimal32, decimal64, decimal128 +from ._delivery import Delivery, Disposition +from ._endpoints import Endpoint, Connection, Session, Link, Receiver, Sender, Terminus +from ._events import Collector, Event, EventType, Handler +from ._exceptions import ProtonException, MessageException, DataException, TransportException, \ + SSLException, SSLUnavailable, ConnectionException, SessionException, LinkException, Timeout, Interrupt +from ._message import Message, ABORTED, ACCEPTED, PENDING, REJECTED, RELEASED, MODIFIED, SETTLED +from ._transport import Transport, SASL, SSL, SSLDomain, SSLSessionDetails +from ._url import Url +__all__ = [ + "API_LANGUAGE", + "IMPLEMENTATION_LANGUAGE", + "ABORTED", + "ACCEPTED", + "PENDING", + "REJECTED", + "RELEASED", + "MODIFIED", + "SETTLED", + "UNDESCRIBED", + "Array", + "Collector", + "Condition", + "Connection", + "Data", + "DataException", + "Delivery", + "Disposition", + "Described", + "Endpoint", + "Event", + "EventType", + "Handler", + "Link", + "LinkException", + "Message", + "MessageException", + "ProtonException", + "VERSION_MAJOR", + "VERSION_MINOR", + "Receiver", + "SASL", + "Sender", + "Session", + "SessionException", + "SSL", + "SSLDomain", + "SSLSessionDetails", + "SSLUnavailable", + "SSLException", + "Terminus", + "Timeout", + "Interrupt", + "Transport", + "TransportException", + "Url", + "char", + "symbol", + "timestamp", + "ulong", + "byte", + "short", + "int32", + "ubyte", + "ushort", + "uint", + "float32", + "decimal32", + "decimal64", + "decimal128" +] VERSION_MAJOR = PN_VERSION_MAJOR VERSION_MINOR = PN_VERSION_MINOR @@ -86,3603 +116,27 @@ VERSION = (VERSION_MAJOR, VERSION_MINOR, VERSION_POINT) API_LANGUAGE = "C" IMPLEMENTATION_LANGUAGE = "C" -class Constant(object): - - def __init__(self, name): - self.name = name - - def __repr__(self): - return self.name - -class ProtonException(Exception): - """ - The root of the proton exception hierarchy. All proton exception - classes derive from this exception. - """ - pass - -class Timeout(ProtonException): - """ - A timeout exception indicates that a blocking operation has timed - out. - """ - pass - -class Interrupt(ProtonException): - """ - An interrupt exception indicates that a blocking operation was interrupted. - """ - pass - -class MessageException(ProtonException): - """ - The MessageException class is the root of the message exception - hierarchy. All exceptions generated by the Message class derive from - this exception. - """ - pass - -EXCEPTIONS = { - PN_TIMEOUT: Timeout, - PN_INTR: Interrupt - } - -PENDING = Constant("PENDING") -ACCEPTED = Constant("ACCEPTED") -REJECTED = Constant("REJECTED") -RELEASED = Constant("RELEASED") -MODIFIED = Constant("MODIFIED") -ABORTED = Constant("ABORTED") -SETTLED = Constant("SETTLED") - -STATUSES = { - PN_STATUS_ABORTED: ABORTED, - PN_STATUS_ACCEPTED: ACCEPTED, - PN_STATUS_REJECTED: REJECTED, - PN_STATUS_RELEASED: RELEASED, - PN_STATUS_MODIFIED: MODIFIED, - PN_STATUS_PENDING: PENDING, - PN_STATUS_SETTLED: SETTLED, - PN_STATUS_UNKNOWN: None - } - -class Message(object): - """The L{Message} class is a mutable holder of message content. - - @ivar instructions: delivery instructions for the message - @type instructions: dict - @ivar annotations: infrastructure defined message annotations - @type annotations: dict - @ivar properties: application defined message properties - @type properties: dict - @ivar body: message body - @type body: bytes | unicode | dict | list | int | long | float | UUID - """ - - DEFAULT_PRIORITY = PN_DEFAULT_PRIORITY - - def __init__(self, body=None, **kwargs): - """ - @param kwargs: Message property name/value pairs to initialise the Message - """ - self._msg = pn_message() - self._id = Data(pn_message_id(self._msg)) - self._correlation_id = Data(pn_message_correlation_id(self._msg)) - self.instructions = None - self.annotations = None - self.properties = None - self.body = body - for k,v in _compat.iteritems(kwargs): - getattr(self, k) # Raise exception if it's not a valid attribute. - setattr(self, k, v) - - def __del__(self): - if hasattr(self, "_msg"): - pn_message_free(self._msg) - del self._msg - - def _check(self, err): - if err < 0: - exc = EXCEPTIONS.get(err, MessageException) - raise exc("[%s]: %s" % (err, pn_error_text(pn_message_error(self._msg)))) - else: - return err - - def _check_property_keys(self): - for k in self.properties.keys(): - if isinstance(k, unicode): - # py2 unicode, py3 str (via hack definition) - continue - # If key is binary then change to string - elif isinstance(k, str): - # py2 str - self.properties[k.encode('utf-8')] = self.properties.pop(k) - else: - raise MessageException('Application property key is not string type: key=%s %s' % (str(k), type(k))) - - def _pre_encode(self): - inst = Data(pn_message_instructions(self._msg)) - ann = Data(pn_message_annotations(self._msg)) - props = Data(pn_message_properties(self._msg)) - body = Data(pn_message_body(self._msg)) - - inst.clear() - if self.instructions is not None: - inst.put_object(self.instructions) - ann.clear() - if self.annotations is not None: - ann.put_object(self.annotations) - props.clear() - if self.properties is not None: - self._check_property_keys() - props.put_object(self.properties) - body.clear() - if self.body is not None: - body.put_object(self.body) - - def _post_decode(self): - inst = Data(pn_message_instructions(self._msg)) - ann = Data(pn_message_annotations(self._msg)) - props = Data(pn_message_properties(self._msg)) - body = Data(pn_message_body(self._msg)) - - if inst.next(): - self.instructions = inst.get_object() - else: - self.instructions = None - if ann.next(): - self.annotations = ann.get_object() - else: - self.annotations = None - if props.next(): - self.properties = props.get_object() - else: - self.properties = None - if body.next(): - self.body = body.get_object() - else: - self.body = None - - def clear(self): - """ - Clears the contents of the L{Message}. All fields will be reset to - their default values. - """ - pn_message_clear(self._msg) - self.instructions = None - self.annotations = None - self.properties = None - self.body = None - - def _is_inferred(self): - return pn_message_is_inferred(self._msg) - - def _set_inferred(self, value): - self._check(pn_message_set_inferred(self._msg, bool(value))) - - inferred = property(_is_inferred, _set_inferred, doc=""" -The inferred flag for a message indicates how the message content -is encoded into AMQP sections. If inferred is true then binary and -list values in the body of the message will be encoded as AMQP DATA -and AMQP SEQUENCE sections, respectively. If inferred is false, -then all values in the body of the message will be encoded as AMQP -VALUE sections regardless of their type. -""") - - def _is_durable(self): - return pn_message_is_durable(self._msg) - - def _set_durable(self, value): - self._check(pn_message_set_durable(self._msg, bool(value))) - - durable = property(_is_durable, _set_durable, - doc=""" -The durable property indicates that the message should be held durably -by any intermediaries taking responsibility for the message. -""") - - def _get_priority(self): - return pn_message_get_priority(self._msg) - - def _set_priority(self, value): - self._check(pn_message_set_priority(self._msg, value)) - - priority = property(_get_priority, _set_priority, - doc=""" -The priority of the message. -""") - - def _get_ttl(self): - return millis2secs(pn_message_get_ttl(self._msg)) - - def _set_ttl(self, value): - self._check(pn_message_set_ttl(self._msg, secs2millis(value))) - - ttl = property(_get_ttl, _set_ttl, - doc=""" -The time to live of the message measured in seconds. Expired messages -may be dropped. -""") - - def _is_first_acquirer(self): - return pn_message_is_first_acquirer(self._msg) - - def _set_first_acquirer(self, value): - self._check(pn_message_set_first_acquirer(self._msg, bool(value))) - - first_acquirer = property(_is_first_acquirer, _set_first_acquirer, - doc=""" -True iff the recipient is the first to acquire the message. -""") - - def _get_delivery_count(self): - return pn_message_get_delivery_count(self._msg) - - def _set_delivery_count(self, value): - self._check(pn_message_set_delivery_count(self._msg, value)) - - delivery_count = property(_get_delivery_count, _set_delivery_count, - doc=""" -The number of delivery attempts made for this message. -""") - - - def _get_id(self): - return self._id.get_object() - def _set_id(self, value): - if type(value) in (int, long): - value = ulong(value) - self._id.rewind() - self._id.put_object(value) - id = property(_get_id, _set_id, - doc=""" -The id of the message. -""") - - def _get_user_id(self): - return pn_message_get_user_id(self._msg) - - def _set_user_id(self, value): - self._check(pn_message_set_user_id(self._msg, value)) - - user_id = property(_get_user_id, _set_user_id, - doc=""" -The user id of the message creator. -""") - - def _get_address(self): - return utf82unicode(pn_message_get_address(self._msg)) - - def _set_address(self, value): - self._check(pn_message_set_address(self._msg, unicode2utf8(value))) - - address = property(_get_address, _set_address, - doc=""" -The address of the message. -""") - - def _get_subject(self): - return utf82unicode(pn_message_get_subject(self._msg)) - - def _set_subject(self, value): - self._check(pn_message_set_subject(self._msg, unicode2utf8(value))) - - subject = property(_get_subject, _set_subject, - doc=""" -The subject of the message. -""") - - def _get_reply_to(self): - return utf82unicode(pn_message_get_reply_to(self._msg)) - - def _set_reply_to(self, value): - self._check(pn_message_set_reply_to(self._msg, unicode2utf8(value))) - - reply_to = property(_get_reply_to, _set_reply_to, - doc=""" -The reply-to address for the message. -""") - - def _get_correlation_id(self): - return self._correlation_id.get_object() - def _set_correlation_id(self, value): - if type(value) in (int, long): - value = ulong(value) - self._correlation_id.rewind() - self._correlation_id.put_object(value) - - correlation_id = property(_get_correlation_id, _set_correlation_id, - doc=""" -The correlation-id for the message. -""") - - def _get_content_type(self): - return symbol(utf82unicode(pn_message_get_content_type(self._msg))) - - def _set_content_type(self, value): - self._check(pn_message_set_content_type(self._msg, unicode2utf8(value))) - - content_type = property(_get_content_type, _set_content_type, - doc=""" -The content-type of the message. -""") - - def _get_content_encoding(self): - return symbol(utf82unicode(pn_message_get_content_encoding(self._msg))) - - def _set_content_encoding(self, value): - self._check(pn_message_set_content_encoding(self._msg, unicode2utf8(value))) - - content_encoding = property(_get_content_encoding, _set_content_encoding, - doc=""" -The content-encoding of the message. -""") - - def _get_expiry_time(self): - return millis2secs(pn_message_get_expiry_time(self._msg)) - - def _set_expiry_time(self, value): - self._check(pn_message_set_expiry_time(self._msg, secs2millis(value))) - - expiry_time = property(_get_expiry_time, _set_expiry_time, - doc=""" -The expiry time of the message. -""") - - def _get_creation_time(self): - return millis2secs(pn_message_get_creation_time(self._msg)) - - def _set_creation_time(self, value): - self._check(pn_message_set_creation_time(self._msg, secs2millis(value))) - - creation_time = property(_get_creation_time, _set_creation_time, - doc=""" -The creation time of the message. -""") - - def _get_group_id(self): - return utf82unicode(pn_message_get_group_id(self._msg)) - - def _set_group_id(self, value): - self._check(pn_message_set_group_id(self._msg, unicode2utf8(value))) - - group_id = property(_get_group_id, _set_group_id, - doc=""" -The group id of the message. -""") - - def _get_group_sequence(self): - return pn_message_get_group_sequence(self._msg) - - def _set_group_sequence(self, value): - self._check(pn_message_set_group_sequence(self._msg, value)) - - group_sequence = property(_get_group_sequence, _set_group_sequence, - doc=""" -The sequence of the message within its group. -""") - - def _get_reply_to_group_id(self): - return utf82unicode(pn_message_get_reply_to_group_id(self._msg)) - - def _set_reply_to_group_id(self, value): - self._check(pn_message_set_reply_to_group_id(self._msg, unicode2utf8(value))) - - reply_to_group_id = property(_get_reply_to_group_id, _set_reply_to_group_id, - doc=""" -The group-id for any replies. -""") - - def encode(self): - self._pre_encode() - sz = 16 - while True: - err, data = pn_message_encode(self._msg, sz) - if err == PN_OVERFLOW: - sz *= 2 - continue - else: - self._check(err) - return data - - def decode(self, data): - self._check(pn_message_decode(self._msg, data)) - self._post_decode() - - def send(self, sender, tag=None): - dlv = sender.delivery(tag or sender.delivery_tag()) - encoded = self.encode() - sender.stream(encoded) - sender.advance() - if sender.snd_settle_mode == Link.SND_SETTLED: - dlv.settle() - return dlv - - def recv(self, link): - """ - Receives and decodes the message content for the current delivery - from the link. Upon success it will return the current delivery - for the link. If there is no current delivery, or if the current - delivery is incomplete, or if the link is not a receiver, it will - return None. - - @type link: Link - @param link: the link to receive a message from - @return the delivery associated with the decoded message (or None) - - """ - if link.is_sender: return None - dlv = link.current - if not dlv or dlv.partial: return None - dlv.encoded = link.recv(dlv.pending) - link.advance() - # the sender has already forgotten about the delivery, so we might - # as well too - if link.remote_snd_settle_mode == Link.SND_SETTLED: - dlv.settle() - self.decode(dlv.encoded) - return dlv - - def __repr2__(self): - props = [] - for attr in ("inferred", "address", "reply_to", "durable", "ttl", - "priority", "first_acquirer", "delivery_count", "id", - "correlation_id", "user_id", "group_id", "group_sequence", - "reply_to_group_id", "instructions", "annotations", - "properties", "body"): - value = getattr(self, attr) - if value: props.append("%s=%r" % (attr, value)) - return "Message(%s)" % ", ".join(props) - - def __repr__(self): - tmp = pn_string(None) - err = pn_inspect(self._msg, tmp) - result = pn_string_get(tmp) - pn_free(tmp) - self._check(err) - return result - -_DEFAULT = object() - -class Selectable(Wrapper): - - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - return Selectable(impl) - - def __init__(self, impl): - Wrapper.__init__(self, impl, pn_selectable_attachments) - - def _init(self): - pass - - def fileno(self, fd = _DEFAULT): - if fd is _DEFAULT: - return pn_selectable_get_fd(self._impl) - elif fd is None: - pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) - else: - pn_selectable_set_fd(self._impl, fd) - - def _is_reading(self): - return pn_selectable_is_reading(self._impl) - - def _set_reading(self, val): - pn_selectable_set_reading(self._impl, bool(val)) - - reading = property(_is_reading, _set_reading) - - def _is_writing(self): - return pn_selectable_is_writing(self._impl) - - def _set_writing(self, val): - pn_selectable_set_writing(self._impl, bool(val)) - - writing = property(_is_writing, _set_writing) - - def _get_deadline(self): - tstamp = pn_selectable_get_deadline(self._impl) - if tstamp: - return millis2secs(tstamp) - else: - return None - - def _set_deadline(self, deadline): - pn_selectable_set_deadline(self._impl, secs2millis(deadline)) - - deadline = property(_get_deadline, _set_deadline) - - def readable(self): - pn_selectable_readable(self._impl) - - def writable(self): - pn_selectable_writable(self._impl) - - def expired(self): - pn_selectable_expired(self._impl) - - def _is_registered(self): - return pn_selectable_is_registered(self._impl) - - def _set_registered(self, registered): - pn_selectable_set_registered(self._impl, registered) - - registered = property(_is_registered, _set_registered, - doc=""" -The registered property may be get/set by an I/O polling system to -indicate whether the fd has been registered or not. -""") - - @property - def is_terminal(self): - return pn_selectable_is_terminal(self._impl) - - def terminate(self): - pn_selectable_terminate(self._impl) - - def release(self): - pn_selectable_release(self._impl) - -class DataException(ProtonException): - """ - The DataException class is the root of the Data exception hierarchy. - All exceptions raised by the Data class extend this exception. - """ - pass - -class UnmappedType: - - def __init__(self, msg): - self.msg = msg - - def __repr__(self): - return "UnmappedType(%s)" % self.msg -class ulong(long): - - def __repr__(self): - return "ulong(%s)" % long.__repr__(self) - -class timestamp(long): - - def __repr__(self): - return "timestamp(%s)" % long.__repr__(self) - -class symbol(unicode): - - def __repr__(self): - return "symbol(%s)" % unicode.__repr__(self) - -class char(unicode): - - def __repr__(self): - return "char(%s)" % unicode.__repr__(self) - -class byte(int): - - def __repr__(self): - return "byte(%s)" % int.__repr__(self) - -class short(int): - - def __repr__(self): - return "short(%s)" % int.__repr__(self) - -class int32(int): - - def __repr__(self): - return "int32(%s)" % int.__repr__(self) - -class ubyte(int): - - def __repr__(self): - return "ubyte(%s)" % int.__repr__(self) - -class ushort(int): - - def __repr__(self): - return "ushort(%s)" % int.__repr__(self) - -class uint(long): - - def __repr__(self): - return "uint(%s)" % long.__repr__(self) - -class float32(float): - - def __repr__(self): - return "float32(%s)" % float.__repr__(self) - -class decimal32(int): - - def __repr__(self): - return "decimal32(%s)" % int.__repr__(self) - -class decimal64(long): - - def __repr__(self): - return "decimal64(%s)" % long.__repr__(self) - -class decimal128(bytes): - - def __repr__(self): - return "decimal128(%s)" % bytes.__repr__(self) - -class Described(object): - - def __init__(self, descriptor, value): - self.descriptor = descriptor - self.value = value - - def __repr__(self): - return "Described(%r, %r)" % (self.descriptor, self.value) - - def __eq__(self, o): - if isinstance(o, Described): - return self.descriptor == o.descriptor and self.value == o.value - else: - return False - -UNDESCRIBED = Constant("UNDESCRIBED") - -class Array(object): - - def __init__(self, descriptor, type, *elements): - self.descriptor = descriptor - self.type = type - self.elements = elements - - def __iter__(self): - return iter(self.elements) - - def __repr__(self): - if self.elements: - els = ", %s" % (", ".join(map(repr, self.elements))) - else: - els = "" - return "Array(%r, %r%s)" % (self.descriptor, self.type, els) - - def __eq__(self, o): - if isinstance(o, Array): - return self.descriptor == o.descriptor and \ - self.type == o.type and self.elements == o.elements - else: - return False - -class Data: - """ - The L{Data} class provides an interface for decoding, extracting, - creating, and encoding arbitrary AMQP data. A L{Data} object - contains a tree of AMQP values. Leaf nodes in this tree correspond - to scalars in the AMQP type system such as L{ints<INT>} or - L{strings<STRING>}. Non-leaf nodes in this tree correspond to - compound values in the AMQP type system such as L{lists<LIST>}, - L{maps<MAP>}, L{arrays<ARRAY>}, or L{described values<DESCRIBED>}. - The root node of the tree is the L{Data} object itself and can have - an arbitrary number of children. - - A L{Data} object maintains the notion of the current sibling node - and a current parent node. Siblings are ordered within their parent. - Values are accessed and/or added by using the L{next}, L{prev}, - L{enter}, and L{exit} methods to navigate to the desired location in - the tree and using the supplied variety of put_*/get_* methods to - access or add a value of the desired type. - - The put_* methods will always add a value I{after} the current node - in the tree. If the current node has a next sibling the put_* method - will overwrite the value on this node. If there is no current node - or the current node has no next sibling then one will be added. The - put_* methods always set the added/modified node to the current - node. The get_* methods read the value of the current node and do - not change which node is current. - - The following types of scalar values are supported: - - - L{NULL} - - L{BOOL} - - L{UBYTE} - - L{USHORT} - - L{SHORT} - - L{UINT} - - L{INT} - - L{ULONG} - - L{LONG} - - L{FLOAT} - - L{DOUBLE} - - L{BINARY} - - L{STRING} - - L{SYMBOL} - - The following types of compound values are supported: - - - L{DESCRIBED} - - L{ARRAY} - - L{LIST} - - L{MAP} - """ - - NULL = PN_NULL; "A null value." - BOOL = PN_BOOL; "A boolean value." - UBYTE = PN_UBYTE; "An unsigned byte value." - BYTE = PN_BYTE; "A signed byte value." - USHORT = PN_USHORT; "An unsigned short value." - SHORT = PN_SHORT; "A short value." - UINT = PN_UINT; "An unsigned int value." - INT = PN_INT; "A signed int value." - CHAR = PN_CHAR; "A character value." - ULONG = PN_ULONG; "An unsigned long value." - LONG = PN_LONG; "A signed long value." - TIMESTAMP = PN_TIMESTAMP; "A timestamp value." - FLOAT = PN_FLOAT; "A float value." - DOUBLE = PN_DOUBLE; "A double value." - DECIMAL32 = PN_DECIMAL32; "A DECIMAL32 value." - DECIMAL64 = PN_DECIMAL64; "A DECIMAL64 value." - DECIMAL128 = PN_DECIMAL128; "A DECIMAL128 value." - UUID = PN_UUID; "A UUID value." - BINARY = PN_BINARY; "A binary string." - STRING = PN_STRING; "A unicode string." - SYMBOL = PN_SYMBOL; "A symbolic string." - DESCRIBED = PN_DESCRIBED; "A described value." - ARRAY = PN_ARRAY; "An array value." - LIST = PN_LIST; "A list value." - MAP = PN_MAP; "A map value." - - type_names = { - NULL: "null", - BOOL: "bool", - BYTE: "byte", - UBYTE: "ubyte", - SHORT: "short", - USHORT: "ushort", - INT: "int", - UINT: "uint", - CHAR: "char", - LONG: "long", - ULONG: "ulong", - TIMESTAMP: "timestamp", - FLOAT: "float", - DOUBLE: "double", - DECIMAL32: "decimal32", - DECIMAL64: "decimal64", - DECIMAL128: "decimal128", - UUID: "uuid", - BINARY: "binary", - STRING: "string", - SYMBOL: "symbol", - DESCRIBED: "described", - ARRAY: "array", - LIST: "list", - MAP: "map" - } - - @classmethod - def type_name(type): return Data.type_names[type] - - def __init__(self, capacity=16): - if type(capacity) in (int, long): - self._data = pn_data(capacity) - self._free = True - else: - self._data = capacity - self._free = False - - def __del__(self): - if self._free and hasattr(self, "_data"): - pn_data_free(self._data) - del self._data - - def _check(self, err): - if err < 0: - exc = EXCEPTIONS.get(err, DataException) - raise exc("[%s]: %s" % (err, pn_error_text(pn_data_error(self._data)))) - else: - return err - - def clear(self): - """ - Clears the data object. - """ - pn_data_clear(self._data) - - def rewind(self): - """ - Clears current node and sets the parent to the root node. Clearing the - current node sets it _before_ the first node, calling next() will advance to - the first node. - """ - assert self._data is not None - pn_data_rewind(self._data) - - def next(self): - """ - Advances the current node to its next sibling and returns its - type. If there is no next sibling the current node remains - unchanged and None is returned. - """ - found = pn_data_next(self._data) - if found: - return self.type() - else: - return None - - def prev(self): - """ - Advances the current node to its previous sibling and returns its - type. If there is no previous sibling the current node remains - unchanged and None is returned. - """ - found = pn_data_prev(self._data) - if found: - return self.type() - else: - return None - - def enter(self): - """ - Sets the parent node to the current node and clears the current node. - Clearing the current node sets it _before_ the first child, - call next() advances to the first child. - """ - return pn_data_enter(self._data) - - def exit(self): - """ - Sets the current node to the parent node and the parent node to - its own parent. - """ - return pn_data_exit(self._data) - - def lookup(self, name): - return pn_data_lookup(self._data, name) - - def narrow(self): - pn_data_narrow(self._data) - - def widen(self): - pn_data_widen(self._data) - - def type(self): - """ - Returns the type of the current node. - """ - dtype = pn_data_type(self._data) - if dtype == -1: - return None - else: - return dtype - - def encoded_size(self): - """ - Returns the size in bytes needed to encode the data in AMQP format. - """ - return pn_data_encoded_size(self._data) - - def encode(self): - """ - Returns a representation of the data encoded in AMQP format. - """ - size = 1024 - while True: - cd, enc = pn_data_encode(self._data, size) - if cd == PN_OVERFLOW: - size *= 2 - elif cd >= 0: - return enc - else: - self._check(cd) - - def decode(self, encoded): - """ - Decodes the first value from supplied AMQP data and returns the - number of bytes consumed. - - @type encoded: binary - @param encoded: AMQP encoded binary data - """ - return self._check(pn_data_decode(self._data, encoded)) - - def put_list(self): - """ - Puts a list value. Elements may be filled by entering the list - node and putting element values. - - >>> data = Data() - >>> data.put_list() - >>> data.enter() - >>> data.put_int(1) - >>> data.put_int(2) - >>> data.put_int(3) - >>> data.exit() - """ - self._check(pn_data_put_list(self._data)) - - def put_map(self): - """ - Puts a map value. Elements may be filled by entering the map node - and putting alternating key value pairs. - - >>> data = Data() - >>> data.put_map() - >>> data.enter() - >>> data.put_string("key") - >>> data.put_string("value") - >>> data.exit() - """ - self._check(pn_data_put_map(self._data)) - - def put_array(self, described, element_type): - """ - Puts an array value. Elements may be filled by entering the array - node and putting the element values. The values must all be of the - specified array element type. If an array is described then the - first child value of the array is the descriptor and may be of any - type. - - >>> data = Data() - >>> - >>> data.put_array(False, Data.INT) - >>> data.enter() - >>> data.put_int(1) - >>> data.put_int(2) - >>> data.put_int(3) - >>> data.exit() - >>> - >>> data.put_array(True, Data.DOUBLE) - >>> data.enter() - >>> data.put_symbol("array-descriptor") - >>> data.put_double(1.1) - >>> data.put_double(1.2) - >>> data.put_double(1.3) - >>> data.exit() - - @type described: bool - @param described: specifies whether the array is described - @type element_type: int - @param element_type: the type of the array elements - """ - self._check(pn_data_put_array(self._data, described, element_type)) - - def put_described(self): - """ - Puts a described value. A described node has two children, the - descriptor and the value. These are specified by entering the node - and putting the desired values. - - >>> data = Data() - >>> data.put_described() - >>> data.enter() - >>> data.put_symbol("value-descriptor") - >>> data.put_string("the value") - >>> data.exit() - """ - self._check(pn_data_put_described(self._data)) - - def put_null(self): - """ - Puts a null value. - """ - self._check(pn_data_put_null(self._data)) - - def put_bool(self, b): - """ - Puts a boolean value. - - @param b: a boolean value - """ - self._check(pn_data_put_bool(self._data, b)) - - def put_ubyte(self, ub): - """ - Puts an unsigned byte value. - - @param ub: an integral value - """ - self._check(pn_data_put_ubyte(self._data, ub)) - - def put_byte(self, b): - """ - Puts a signed byte value. - - @param b: an integral value - """ - self._check(pn_data_put_byte(self._data, b)) - - def put_ushort(self, us): - """ - Puts an unsigned short value. - - @param us: an integral value. - """ - self._check(pn_data_put_ushort(self._data, us)) - - def put_short(self, s): - """ - Puts a signed short value. - - @param s: an integral value - """ - self._check(pn_data_put_short(self._data, s)) - - def put_uint(self, ui): - """ - Puts an unsigned int value. - - @param ui: an integral value - """ - self._check(pn_data_put_uint(self._data, ui)) - - def put_int(self, i): - """ - Puts a signed int value. - - @param i: an integral value - """ - self._check(pn_data_put_int(self._data, i)) - - def put_char(self, c): - """ - Puts a char value. - - @param c: a single character - """ - self._check(pn_data_put_char(self._data, ord(c))) - - def put_ulong(self, ul): - """ - Puts an unsigned long value. - - @param ul: an integral value - """ - self._check(pn_data_put_ulong(self._data, ul)) - - def put_long(self, l): - """ - Puts a signed long value. - - @param l: an integral value - """ - self._check(pn_data_put_long(self._data, l)) - - def put_timestamp(self, t): - """ - Puts a timestamp value. - - @param t: an integral value - """ - self._check(pn_data_put_timestamp(self._data, t)) - - def put_float(self, f): - """ - Puts a float value. - - @param f: a floating point value - """ - self._check(pn_data_put_float(self._data, f)) - - def put_double(self, d): - """ - Puts a double value. - - @param d: a floating point value. - """ - self._check(pn_data_put_double(self._data, d)) - - def put_decimal32(self, d): - """ - Puts a decimal32 value. - - @param d: a decimal32 value - """ - self._check(pn_data_put_decimal32(self._data, d)) - - def put_decimal64(self, d): - """ - Puts a decimal64 value. - - @param d: a decimal64 value - """ - self._check(pn_data_put_decimal64(self._data, d)) - - def put_decimal128(self, d): - """ - Puts a decimal128 value. - - @param d: a decimal128 value - """ - self._check(pn_data_put_decimal128(self._data, d)) - - def put_uuid(self, u): - """ - Puts a UUID value. - - @param u: a uuid value - """ - self._check(pn_data_put_uuid(self._data, u.bytes)) - - def put_binary(self, b): - """ - Puts a binary value. - - @type b: binary - @param b: a binary value - """ - self._check(pn_data_put_binary(self._data, b)) - - def put_memoryview(self, mv): - """Put a python memoryview object as an AMQP binary value""" - self.put_binary(mv.tobytes()) - - def put_buffer(self, buff): - """Put a python buffer object as an AMQP binary value""" - self.put_binary(bytes(buff)) - - def put_string(self, s): - """ - Puts a unicode value. - - @type s: unicode - @param s: a unicode value - """ - self._check(pn_data_put_string(self._data, s.encode("utf8"))) - - def put_symbol(self, s): - """ - Puts a symbolic value. - - @type s: string - @param s: the symbol name - """ - self._check(pn_data_put_symbol(self._data, s.encode('ascii'))) - - def get_list(self): - """ - If the current node is a list, return the number of elements, - otherwise return zero. List elements can be accessed by entering - the list. - - >>> count = data.get_list() - >>> data.enter() - >>> for i in range(count): - ... type = data.next() - ... if type == Data.STRING: - ... print data.get_string() - ... elif type == ...: - ... ... - >>> data.exit() - """ - return pn_data_get_list(self._data) - - def get_map(self): - """ - If the current node is a map, return the number of child elements, - otherwise return zero. Key value pairs can be accessed by entering - the map. - - >>> count = data.get_map() - >>> data.enter() - >>> for i in range(count/2): - ... type = data.next() - ... if type == Data.STRING: - ... print data.get_string() - ... elif type == ...: - ... ... - >>> data.exit() - """ - return pn_data_get_map(self._data) - - def get_array(self): - """ - If the current node is an array, return a tuple of the element - count, a boolean indicating whether the array is described, and - the type of each element, otherwise return (0, False, None). Array - data can be accessed by entering the array. - - >>> # read an array of strings with a symbolic descriptor - >>> count, described, type = data.get_array() - >>> data.enter() - >>> data.next() - >>> print "Descriptor:", data.get_symbol() - >>> for i in range(count): - ... data.next() - ... print "Element:", data.get_string() - >>> data.exit() - """ - count = pn_data_get_array(self._data) - described = pn_data_is_array_described(self._data) - type = pn_data_get_array_type(self._data) - if type == -1: - type = None - return count, described, type +# This private NullHandler is required for Python 2.6, +# when we no longer support 2.6 replace this NullHandler class definition and assignment with: +# handler = logging.NullHandler() +class NullHandler(logging.Handler): + def handle(self, record): + pass - def is_described(self): - """ - Checks if the current node is a described value. The descriptor - and value may be accessed by entering the described value. + def emit(self, record): + pass - >>> # read a symbolically described string - >>> assert data.is_described() # will error if the current node is not described - >>> data.enter() - >>> data.next() - >>> print data.get_symbol() - >>> data.next() - >>> print data.get_string() - >>> data.exit() - """ - return pn_data_is_described(self._data) + def createLock(self): + self.lock = None - def is_null(self): - """ - Checks if the current node is a null. - """ - return pn_data_is_null(self._data) - def get_bool(self): - """ - If the current node is a boolean, returns its value, returns False - otherwise. - """ - return pn_data_get_bool(self._data) +handler = NullHandler() - def get_ubyte(self): - """ - If the current node is an unsigned byte, returns its value, - returns 0 otherwise. - """ - return ubyte(pn_data_get_ubyte(self._data)) +log = logging.getLogger("proton") +log.addHandler(handler) - def get_byte(self): - """ - If the current node is a signed byte, returns its value, returns 0 - otherwise. - """ - return byte(pn_data_get_byte(self._data)) - def get_ushort(self): - """ - If the current node is an unsigned short, returns its value, - returns 0 otherwise. - """ - return ushort(pn_data_get_ushort(self._data)) - - def get_short(self): - """ - If the current node is a signed short, returns its value, returns - 0 otherwise. - """ - return short(pn_data_get_short(self._data)) - - def get_uint(self): - """ - If the current node is an unsigned int, returns its value, returns - 0 otherwise. - """ - return uint(pn_data_get_uint(self._data)) - - def get_int(self): - """ - If the current node is a signed int, returns its value, returns 0 - otherwise. - """ - return int32(pn_data_get_int(self._data)) - - def get_char(self): - """ - If the current node is a char, returns its value, returns 0 - otherwise. - """ - return char(_compat.unichr(pn_data_get_char(self._data))) - - def get_ulong(self): - """ - If the current node is an unsigned long, returns its value, - returns 0 otherwise. - """ - return ulong(pn_data_get_ulong(self._data)) - - def get_long(self): - """ - If the current node is an signed long, returns its value, returns - 0 otherwise. - """ - return long(pn_data_get_long(self._data)) - - def get_timestamp(self): - """ - If the current node is a timestamp, returns its value, returns 0 - otherwise. - """ - return timestamp(pn_data_get_timestamp(self._data)) - - def get_float(self): - """ - If the current node is a float, returns its value, raises 0 - otherwise. - """ - return float32(pn_data_get_float(self._data)) - - def get_double(self): - """ - If the current node is a double, returns its value, returns 0 - otherwise. - """ - return pn_data_get_double(self._data) - - # XXX: need to convert - def get_decimal32(self): - """ - If the current node is a decimal32, returns its value, returns 0 - otherwise. - """ - return decimal32(pn_data_get_decimal32(self._data)) - - # XXX: need to convert - def get_decimal64(self): - """ - If the current node is a decimal64, returns its value, returns 0 - otherwise. - """ - return decimal64(pn_data_get_decimal64(self._data)) - - # XXX: need to convert - def get_decimal128(self): - """ - If the current node is a decimal128, returns its value, returns 0 - otherwise. - """ - return decimal128(pn_data_get_decimal128(self._data)) - - def get_uuid(self): - """ - If the current node is a UUID, returns its value, returns None - otherwise. - """ - if pn_data_type(self._data) == Data.UUID: - return uuid.UUID(bytes=pn_data_get_uuid(self._data)) - else: - return None - - def get_binary(self): - """ - If the current node is binary, returns its value, returns "" - otherwise. - """ - return pn_data_get_binary(self._data) - - def get_string(self): - """ - If the current node is a string, returns its value, returns "" - otherwise. - """ - return pn_data_get_string(self._data).decode("utf8") - - def get_symbol(self): - """ - If the current node is a symbol, returns its value, returns "" - otherwise. - """ - return symbol(pn_data_get_symbol(self._data).decode('ascii')) - - def copy(self, src): - self._check(pn_data_copy(self._data, src._data)) - - def format(self): - sz = 16 - while True: - err, result = pn_data_format(self._data, sz) - if err == PN_OVERFLOW: - sz *= 2 - continue - else: - self._check(err) - return result - - def dump(self): - pn_data_dump(self._data) - - def put_dict(self, d): - self.put_map() - self.enter() - try: - for k, v in d.items(): - self.put_object(k) - self.put_object(v) - finally: - self.exit() - - def get_dict(self): - if self.enter(): - try: - result = {} - while self.next(): - k = self.get_object() - if self.next(): - v = self.get_object() - else: - v = None - result[k] = v - finally: - self.exit() - return result - - def put_sequence(self, s): - self.put_list() - self.enter() - try: - for o in s: - self.put_object(o) - finally: - self.exit() - - def get_sequence(self): - if self.enter(): - try: - result = [] - while self.next(): - result.append(self.get_object()) - finally: - self.exit() - return result - - def get_py_described(self): - if self.enter(): - try: - self.next() - descriptor = self.get_object() - self.next() - value = self.get_object() - finally: - self.exit() - return Described(descriptor, value) - - def put_py_described(self, d): - self.put_described() - self.enter() - try: - self.put_object(d.descriptor) - self.put_object(d.value) - finally: - self.exit() - - def get_py_array(self): - """ - If the current node is an array, return an Array object - representing the array and its contents. Otherwise return None. - This is a convenience wrapper around get_array, enter, etc. - """ - - count, described, type = self.get_array() - if type is None: return None - if self.enter(): - try: - if described: - self.next() - descriptor = self.get_object() - else: - descriptor = UNDESCRIBED - elements = [] - while self.next(): - elements.append(self.get_object()) - finally: - self.exit() - return Array(descriptor, type, *elements) - - def put_py_array(self, a): - described = a.descriptor != UNDESCRIBED - self.put_array(described, a.type) - self.enter() - try: - if described: - self.put_object(a.descriptor) - for e in a.elements: - self.put_object(e) - finally: - self.exit() - - put_mappings = { - None.__class__: lambda s, _: s.put_null(), - bool: put_bool, - ubyte: put_ubyte, - ushort: put_ushort, - uint: put_uint, - ulong: put_ulong, - byte: put_byte, - short: put_short, - int32: put_int, - long: put_long, - float32: put_float, - float: put_double, - decimal32: put_decimal32, - decimal64: put_decimal64, - decimal128: put_decimal128, - char: put_char, - timestamp: put_timestamp, - uuid.UUID: put_uuid, - bytes: put_binary, - unicode: put_string, - symbol: put_symbol, - list: put_sequence, - tuple: put_sequence, - dict: put_dict, - Described: put_py_described, - Array: put_py_array - } - # for python 3.x, long is merely an alias for int, but for python 2.x - # we need to add an explicit int since it is a different type - if int not in put_mappings: - put_mappings[int] = put_int - # Python >=3.0 has 'memoryview', <=2.5 has 'buffer', >=2.6 has both. - try: put_mappings[memoryview] = put_memoryview - except NameError: pass - try: put_mappings[buffer] = put_buffer - except NameError: pass - get_mappings = { - NULL: lambda s: None, - BOOL: get_bool, - BYTE: get_byte, - UBYTE: get_ubyte, - SHORT: get_short, - USHORT: get_ushort, - INT: get_int, - UINT: get_uint, - CHAR: get_char, - LONG: get_long, - ULONG: get_ulong, - TIMESTAMP: get_timestamp, - FLOAT: get_float, - DOUBLE: get_double, - DECIMAL32: get_decimal32, - DECIMAL64: get_decimal64, - DECIMAL128: get_decimal128, - UUID: get_uuid, - BINARY: get_binary, - STRING: get_string, - SYMBOL: get_symbol, - DESCRIBED: get_py_described, - ARRAY: get_py_array, - LIST: get_sequence, - MAP: get_dict - } - - - def put_object(self, obj): - putter = self.put_mappings[obj.__class__] - putter(self, obj) - - def get_object(self): - type = self.type() - if type is None: return None - getter = self.get_mappings.get(type) - if getter: - return getter(self) - else: - return UnmappedType(str(type)) - -class ConnectionException(ProtonException): - pass - -class Endpoint(object): - - LOCAL_UNINIT = PN_LOCAL_UNINIT - REMOTE_UNINIT = PN_REMOTE_UNINIT - LOCAL_ACTIVE = PN_LOCAL_ACTIVE - REMOTE_ACTIVE = PN_REMOTE_ACTIVE - LOCAL_CLOSED = PN_LOCAL_CLOSED - REMOTE_CLOSED = PN_REMOTE_CLOSED - - def _init(self): - self.condition = None - - def _update_cond(self): - obj2cond(self.condition, self._get_cond_impl()) - - @property - def remote_condition(self): - return cond2obj(self._get_remote_cond_impl()) - - # the following must be provided by subclasses - def _get_cond_impl(self): - assert False, "Subclass must override this!" - - def _get_remote_cond_impl(self): - assert False, "Subclass must override this!" - - def _get_handler(self): - from . import reactor - ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) - if ractor: - on_error = ractor.on_error_delegate() - else: - on_error = None - record = self._get_attachments() - return WrappedHandler.wrap(pn_record_get_handler(record), on_error) - - def _set_handler(self, handler): - from . import reactor - ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) - if ractor: - on_error = ractor.on_error_delegate() - else: - on_error = None - impl = _chandler(handler, on_error) - record = self._get_attachments() - pn_record_set_handler(record, impl) - pn_decref(impl) - - handler = property(_get_handler, _set_handler) - - @property - def transport(self): - return self.connection.transport - -class Condition: - - def __init__(self, name, description=None, info=None): - self.name = name - self.description = description - self.info = info - - def __repr__(self): - return "Condition(%s)" % ", ".join([repr(x) for x in - (self.name, self.description, self.info) - if x]) - - def __eq__(self, o): - if not isinstance(o, Condition): return False - return self.name == o.name and \ - self.description == o.description and \ - self.info == o.info - -def obj2cond(obj, cond): - pn_condition_clear(cond) - if obj: - pn_condition_set_name(cond, str(obj.name)) - pn_condition_set_description(cond, obj.description) - info = Data(pn_condition_info(cond)) - if obj.info: - info.put_object(obj.info) - -def cond2obj(cond): - if pn_condition_is_set(cond): - return Condition(pn_condition_get_name(cond), - pn_condition_get_description(cond), - dat2obj(pn_condition_info(cond))) - else: - return None - -def dat2obj(dimpl): - if dimpl: - d = Data(dimpl) - d.rewind() - d.next() - obj = d.get_object() - d.rewind() - return obj - -def obj2dat(obj, dimpl): - if obj is not None: - d = Data(dimpl) - d.put_object(obj) - -def secs2millis(secs): - return long(secs*1000) - -def millis2secs(millis): - return float(millis)/1000.0 - -def timeout2millis(secs): - if secs is None: return PN_MILLIS_MAX - return secs2millis(secs) - -def millis2timeout(millis): - if millis == PN_MILLIS_MAX: return None - return millis2secs(millis) - -def unicode2utf8(string): - """Some Proton APIs expect a null terminated string. Convert python text - types to UTF8 to avoid zero bytes introduced by other multi-byte encodings. - This method will throw if the string cannot be converted. - """ - if string is None: - return None - elif isinstance(string, str): - # Must be py2 or py3 str - # The swig binding converts py3 str -> utf8 char* and back sutomatically - return string - elif isinstance(string, unicode): - # This must be python2 unicode as we already detected py3 str above - return string.encode('utf-8') - # Anything else illegal - specifically python3 bytes - raise TypeError("Unrecognized string type: %r (%s)" % (string, type(string))) - -def utf82unicode(string): - """Convert C strings returned from proton-c into python unicode""" - if string is None: - return None - elif isinstance(string, unicode): - # py2 unicode, py3 str (via hack definition) - return string - elif isinstance(string, bytes): - # py2 str (via hack definition), py3 bytes - return string.decode('utf8') - raise TypeError("Unrecognized string type") - -class Connection(Wrapper, Endpoint): - """ - A representation of an AMQP connection - """ - - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - return Connection(impl) - - def __init__(self, impl = pn_connection): - Wrapper.__init__(self, impl, pn_connection_attachments) - - def _init(self): - Endpoint._init(self) - self.offered_capabilities = None - self.desired_capabilities = None - self.properties = None - - def _get_attachments(self): - return pn_connection_attachments(self._impl) - - @property - def connection(self): - return self - - @property - def transport(self): - return Transport.wrap(pn_connection_transport(self._impl)) - - def _check(self, err): - if err < 0: - exc = EXCEPTIONS.get(err, ConnectionException) - raise exc("[%s]: %s" % (err, pn_connection_error(self._impl))) - else: - return err - - def _get_cond_impl(self): - return pn_connection_condition(self._impl) - - def _get_remote_cond_impl(self): - return pn_connection_remote_condition(self._impl) - - def collect(self, collector): - if collector is None: - pn_connection_collect(self._impl, None) - else: - pn_connection_collect(self._impl, collector._impl) - self._collector = weakref.ref(collector) - - def _get_container(self): - return utf82unicode(pn_connection_get_container(self._impl)) - def _set_container(self, name): - return pn_connection_set_container(self._impl, unicode2utf8(name)) - - container = property(_get_container, _set_container) - - def _get_hostname(self): - return utf82unicode(pn_connection_get_hostname(self._impl)) - def _set_hostname(self, name): - return pn_connection_set_hostname(self._impl, unicode2utf8(name)) - - hostname = property(_get_hostname, _set_hostname, - doc=""" -Set the name of the host (either fully qualified or relative) to which this -connection is connecting to. This information may be used by the remote -peer to determine the correct back-end service to connect the client to. -This value will be sent in the Open performative, and will be used by SSL -and SASL layers to identify the peer. -""") - - def _get_user(self): - return utf82unicode(pn_connection_get_user(self._impl)) - def _set_user(self, name): - return pn_connection_set_user(self._impl, unicode2utf8(name)) - - user = property(_get_user, _set_user) - - def _get_password(self): - return None - def _set_password(self, name): - return pn_connection_set_password(self._impl, unicode2utf8(name)) - - password = property(_get_password, _set_password) - - @property - def remote_container(self): - """The container identifier specified by the remote peer for this connection.""" - return pn_connection_remote_container(self._impl) - - @property - def remote_hostname(self): - """The hostname specified by the remote peer for this connection.""" - return pn_connection_remote_hostname(self._impl) - - @property - def remote_offered_capabilities(self): - """The capabilities offered by the remote peer for this connection.""" - return dat2obj(pn_connection_remote_offered_capabilities(self._impl)) - - @property - def remote_desired_capabilities(self): - """The capabilities desired by the remote peer for this connection.""" - return dat2obj(pn_connection_remote_desired_capabilities(self._impl)) - - @property - def remote_properties(self): - """The properties specified by the remote peer for this connection.""" - return dat2obj(pn_connection_remote_properties(self._impl)) - - def open(self): - """ - Opens the connection. - - In more detail, this moves the local state of the connection to - the ACTIVE state and triggers an open frame to be sent to the - peer. A connection is fully active once both peers have opened it. - """ - obj2dat(self.offered_capabilities, - pn_connection_offered_capabilities(self._impl)) - obj2dat(self.desired_capabilities, - pn_connection_desired_capabilities(self._impl)) - obj2dat(self.properties, pn_connection_properties(self._impl)) - pn_connection_open(self._impl) - - def close(self): - """ - Closes the connection. - - In more detail, this moves the local state of the connection to - the CLOSED state and triggers a close frame to be sent to the - peer. A connection is fully closed once both peers have closed it. - """ - self._update_cond() - pn_connection_close(self._impl) - if hasattr(self, '_session_policy'): - # break circular ref - del self._session_policy - - @property - def state(self): - """ - The state of the connection as a bit field. The state has a local - and a remote component. Each of these can be in one of three - states: UNINIT, ACTIVE or CLOSED. These can be tested by masking - against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, - REMOTE_ACTIVE and REMOTE_CLOSED. - """ - return pn_connection_state(self._impl) - - def session(self): - """ - Returns a new session on this connection. - """ - ssn = pn_session(self._impl) - if ssn is None: - raise(SessionException("Session allocation failed.")) - else: - return Session(ssn) - - def session_head(self, mask): - return Session.wrap(pn_session_head(self._impl, mask)) - - def link_head(self, mask): - return Link.wrap(pn_link_head(self._impl, mask)) - - @property - def work_head(self): - return Delivery.wrap(pn_work_head(self._impl)) - - @property - def error(self): - return pn_error_code(pn_connection_error(self._impl)) - - def free(self): - pn_connection_release(self._impl) - -class SessionException(ProtonException): - pass - -class Session(Wrapper, Endpoint): - - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - return Session(impl) - - def __init__(self, impl): - Wrapper.__init__(self, impl, pn_session_attachments) - - def _get_attachments(self): - return pn_session_attachments(self._impl) - - def _get_cond_impl(self): - return pn_session_condition(self._impl) - - def _get_remote_cond_impl(self): - return pn_session_remote_condition(self._impl) - - def _get_incoming_capacity(self): - return pn_session_get_incoming_capacity(self._impl) - - def _set_incoming_capacity(self, capacity): - pn_session_set_incoming_capacity(self._impl, capacity) - - incoming_capacity = property(_get_incoming_capacity, _set_incoming_capacity) - - def _get_outgoing_window(self): - return pn_session_get_outgoing_window(self._impl) - - def _set_outgoing_window(self, window): - pn_session_set_outgoing_window(self._impl, window) - - outgoing_window = property(_get_outgoing_window, _set_outgoing_window) - - @property - def outgoing_bytes(self): - return pn_session_outgoing_bytes(self._impl) - - @property - def incoming_bytes(self): - return pn_session_incoming_bytes(self._impl) - - def open(self): - pn_session_open(self._impl) - - def close(self): - self._update_cond() - pn_session_close(self._impl) - - def next(self, mask): - return Session.wrap(pn_session_next(self._impl, mask)) - - @property - def state(self): - return pn_session_state(self._impl) - - @property - def connection(self): - return Connection.wrap(pn_session_connection(self._impl)) - - def sender(self, name): - return Sender(pn_sender(self._impl, unicode2utf8(name))) - - def receiver(self, name): - return Receiver(pn_receiver(self._impl, unicode2utf8(name))) - - def free(self): - pn_session_free(self._impl) - -class LinkException(ProtonException): - pass - -class Link(Wrapper, Endpoint): - """ - A representation of an AMQP link, of which there are two concrete - implementations, Sender and Receiver. - """ - - SND_UNSETTLED = PN_SND_UNSETTLED - SND_SETTLED = PN_SND_SETTLED - SND_MIXED = PN_SND_MIXED - - RCV_FIRST = PN_RCV_FIRST - RCV_SECOND = PN_RCV_SECOND - - @staticmethod - def wrap(impl): - if impl is None: return None - if pn_link_is_sender(impl): - return Sender(impl) - else: - return Receiver(impl) - - def __init__(self, impl): - Wrapper.__init__(self, impl, pn_link_attachments) - - def _get_attachments(self): - return pn_link_attachments(self._impl) - - def _check(self, err): - if err < 0: - exc = EXCEPTIONS.get(err, LinkException) - raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl)))) - else: - return err - - def _get_cond_impl(self): - return pn_link_condition(self._impl) - - def _get_remote_cond_impl(self): - return pn_link_remote_condition(self._impl) - - def open(self): - """ - Opens the link. - - In more detail, this moves the local state of the link to the - ACTIVE state and triggers an attach frame to be sent to the - peer. A link is fully active once both peers have attached it. - """ - pn_link_open(self._impl) - - def close(self): - """ - Closes the link. - - In more detail, this moves the local state of the link to the - CLOSED state and triggers an detach frame (with the closed flag - set) to be sent to the peer. A link is fully closed once both - peers have detached it. - """ - self._update_cond() - pn_link_close(self._impl) - - @property - def state(self): - """ - The state of the link as a bit field. The state has a local - and a remote component. Each of these can be in one of three - states: UNINIT, ACTIVE or CLOSED. These can be tested by masking - against LOCAL_UNINIT, LOCAL_ACTIVE, LOCAL_CLOSED, REMOTE_UNINIT, - REMOTE_ACTIVE and REMOTE_CLOSED. - """ - return pn_link_state(self._impl) - - @property - def source(self): - """The source of the link as described by the local peer.""" - return Terminus(pn_link_source(self._impl)) - - @property - def target(self): - """The target of the link as described by the local peer.""" - return Terminus(pn_link_target(self._impl)) - - @property - def remote_source(self): - """The source of the link as described by the remote peer.""" - return Terminus(pn_link_remote_source(self._impl)) - @property - def remote_target(self): - """The target of the link as described by the remote peer.""" - return Terminus(pn_link_remote_target(self._impl)) - - @property - def session(self): - return Session.wrap(pn_link_session(self._impl)) - - @property - def connection(self): - """The connection on which this link was attached.""" - return self.session.connection - - def delivery(self, tag): - return Delivery(pn_delivery(self._impl, tag)) - - @property - def current(self): - return Delivery.wrap(pn_link_current(self._impl)) - - def advance(self): - return pn_link_advance(self._impl) - - @property - def unsettled(self): - return pn_link_unsettled(self._impl) - - @property - def credit(self): - """The amount of outstanding credit on this link.""" - return pn_link_credit(self._impl) - - @property - def available(self): - return pn_link_available(self._impl) - - @property - def queued(self): - return pn_link_queued(self._impl) - - def next(self, mask): - return Link.wrap(pn_link_next(self._impl, mask)) - - @property - def name(self): - """Returns the name of the link""" - return utf82unicode(pn_link_name(self._impl)) - - @property - def is_sender(self): - """Returns true if this link is a sender.""" - return pn_link_is_sender(self._impl) - - @property - def is_receiver(self): - """Returns true if this link is a receiver.""" - return pn_link_is_receiver(self._impl) - - @property - def remote_snd_settle_mode(self): - return pn_link_remote_snd_settle_mode(self._impl) - - @property - def remote_rcv_settle_mode(self): - return pn_link_remote_rcv_settle_mode(self._impl) - - def _get_snd_settle_mode(self): - return pn_link_snd_settle_mode(self._impl) - def _set_snd_settle_mode(self, mode): - pn_link_set_snd_settle_mode(self._impl, mode) - snd_settle_mode = property(_get_snd_settle_mode, _set_snd_settle_mode) - - def _get_rcv_settle_mode(self): - return pn_link_rcv_settle_mode(self._impl) - def _set_rcv_settle_mode(self, mode): - pn_link_set_rcv_settle_mode(self._impl, mode) - rcv_settle_mode = property(_get_rcv_settle_mode, _set_rcv_settle_mode) - - def _get_drain(self): - return pn_link_get_drain(self._impl) - - def _set_drain(self, b): - pn_link_set_drain(self._impl, bool(b)) - - drain_mode = property(_get_drain, _set_drain) - - def drained(self): - return pn_link_drained(self._impl) - - @property - def remote_max_message_size(self): - return pn_link_remote_max_message_size(self._impl) - - def _get_max_message_size(self): - return pn_link_max_message_size(self._impl) - def _set_max_message_size(self, mode): - pn_link_set_max_message_size(self._impl, mode) - max_message_size = property(_get_max_message_size, _set_max_message_size) - - def detach(self): - return pn_link_detach(self._impl) - - def free(self): - pn_link_free(self._impl) - -class Terminus(object): - - UNSPECIFIED = PN_UNSPECIFIED - SOURCE = PN_SOURCE - TARGET = PN_TARGET - COORDINATOR = PN_COORDINATOR - - NONDURABLE = PN_NONDURABLE - CONFIGURATION = PN_CONFIGURATION - DELIVERIES = PN_DELIVERIES - - DIST_MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED - DIST_MODE_COPY = PN_DIST_MODE_COPY - DIST_MODE_MOVE = PN_DIST_MODE_MOVE - - EXPIRE_WITH_LINK = PN_EXPIRE_WITH_LINK - EXPIRE_WITH_SESSION = PN_EXPIRE_WITH_SESSION - EXPIRE_WITH_CONNECTION = PN_EXPIRE_WITH_CONNECTION - EXPIRE_NEVER = PN_EXPIRE_NEVER - - def __init__(self, impl): - self._impl = impl - - def _check(self, err): - if err < 0: - exc = EXCEPTIONS.get(err, LinkException) - raise exc("[%s]" % err) - else: - return err - - def _get_type(self): - return pn_terminus_get_type(self._impl) - def _set_type(self, type): - self._check(pn_terminus_set_type(self._impl, type)) - type = property(_get_type, _set_type) - - def _get_address(self): - """The address that identifies the source or target node""" - return utf82unicode(pn_terminus_get_address(self._impl)) - def _set_address(self, address): - self._check(pn_terminus_set_address(self._impl, unicode2utf8(address))) - address = property(_get_address, _set_address) - - def _get_durability(self): - return pn_terminus_get_durability(self._impl) - def _set_durability(self, seconds): - self._check(pn_terminus_set_durability(self._impl, seconds)) - durability = property(_get_durability, _set_durability) - - def _get_expiry_policy(self): - return pn_terminus_get_expiry_policy(self._impl) - def _set_expiry_policy(self, seconds): - self._check(pn_terminus_set_expiry_policy(self._impl, seconds)) - expiry_policy = property(_get_expiry_policy, _set_expiry_policy) - - def _get_timeout(self): - return pn_terminus_get_timeout(self._impl) - def _set_timeout(self, seconds): - self._check(pn_terminus_set_timeout(self._impl, seconds)) - timeout = property(_get_timeout, _set_timeout) - - def _is_dynamic(self): - """Indicates whether the source or target node was dynamically - created""" - return pn_terminus_is_dynamic(self._impl) - def _set_dynamic(self, dynamic): - self._check(pn_terminus_set_dynamic(self._impl, dynamic)) - dynamic = property(_is_dynamic, _set_dynamic) - - def _get_distribution_mode(self): - return pn_terminus_get_distribution_mode(self._impl) - def _set_distribution_mode(self, mode): - self._check(pn_terminus_set_distribution_mode(self._impl, mode)) - distribution_mode = property(_get_distribution_mode, _set_distribution_mode) - - @property - def properties(self): - """Properties of a dynamic source or target.""" - return Data(pn_terminus_properties(self._impl)) - - @property - def capabilities(self): - """Capabilities of the source or target.""" - return Data(pn_terminus_capabilities(self._impl)) - - @property - def outcomes(self): - return Data(pn_terminus_outcomes(self._impl)) - - @property - def filter(self): - """A filter on a source allows the set of messages transfered over - the link to be restricted""" - return Data(pn_terminus_filter(self._impl)) - - def copy(self, src): - self._check(pn_terminus_copy(self._impl, src._impl)) - -class Sender(Link): - """ - A link over which messages are sent. - """ - - def offered(self, n): - pn_link_offered(self._impl, n) - - def stream(self, data): - """ - Send specified data as part of the current delivery - - @type data: binary - @param data: data to send - """ - return self._check(pn_link_send(self._impl, data)) - - def send(self, obj, tag=None): - """ - Send specified object over this sender; the object is expected to - have a send() method on it that takes the sender and an optional - tag as arguments. - - Where the object is a Message, this will send the message over - this link, creating a new delivery for the purpose. - """ - if hasattr(obj, 'send'): - return obj.send(self, tag=tag) - else: - # treat object as bytes - return self.stream(obj) - - def delivery_tag(self): - if not hasattr(self, 'tag_generator'): - def simple_tags(): - count = 1 - while True: - yield str(count) - count += 1 - self.tag_generator = simple_tags() - return next(self.tag_generator) - -class Receiver(Link): - """ - A link over which messages are received. - """ - - def flow(self, n): - """Increases the credit issued to the remote sender by the specified number of messages.""" - pn_link_flow(self._impl, n) - - def recv(self, limit): - n, binary = pn_link_recv(self._impl, limit) - if n == PN_EOS: - return None - else: - self._check(n) - return binary - - def drain(self, n): - pn_link_drain(self._impl, n) - - def draining(self): - return pn_link_draining(self._impl) - -class NamedInt(int): - - values = {} - - def __new__(cls, i, name): - ni = super(NamedInt, cls).__new__(cls, i) - cls.values[i] = ni - return ni - - def __init__(self, i, name): - self.name = name - - def __repr__(self): - return self.name - - def __str__(self): - return self.name - - @classmethod - def get(cls, i): - return cls.values.get(i, i) - -class DispositionType(NamedInt): - values = {} - -class Disposition(object): - - RECEIVED = DispositionType(PN_RECEIVED, "RECEIVED") - ACCEPTED = DispositionType(PN_ACCEPTED, "ACCEPTED") - REJECTED = DispositionType(PN_REJECTED, "REJECTED") - RELEASED = DispositionType(PN_RELEASED, "RELEASED") - MODIFIED = DispositionType(PN_MODIFIED, "MODIFIED") - - def __init__(self, impl, local): - self._impl = impl - self.local = local - self._data = None - self._condition = None - self._annotations = None - - @property - def type(self): - return DispositionType.get(pn_disposition_type(self._impl)) - - def _get_section_number(self): - return pn_disposition_get_section_number(self._impl) - def _set_section_number(self, n): - pn_disposition_set_section_number(self._impl, n) - section_number = property(_get_section_number, _set_section_number) - - def _get_section_offset(self): - return pn_disposition_get_section_offset(self._impl) - def _set_section_offset(self, n): - pn_disposition_set_section_offset(self._impl, n) - section_offset = property(_get_section_offset, _set_section_offset) - - def _get_failed(self): - return pn_disposition_is_failed(self._impl) - def _set_failed(self, b): - pn_disposition_set_failed(self._impl, b) - failed = property(_get_failed, _set_failed) - - def _get_undeliverable(self): - return pn_disposition_is_undeliverable(self._impl) - def _set_undeliverable(self, b): - pn_disposition_set_undeliverable(self._impl, b) - undeliverable = property(_get_undeliverable, _set_undeliverable) - - def _get_data(self): - if self.local: - return self._data - else: - return dat2obj(pn_disposition_data(self._impl)) - def _set_data(self, obj): - if self.local: - self._data = obj - else: - raise AttributeError("data attribute is read-only") - data = property(_get_data, _set_data) - - def _get_annotations(self): - if self.local: - return self._annotations - else: - return dat2obj(pn_disposition_annotations(self._impl)) - def _set_annotations(self, obj): - if self.local: - self._annotations = obj - else: - raise AttributeError("annotations attribute is read-only") - annotations = property(_get_annotations, _set_annotations) - - def _get_condition(self): - if self.local: - return self._condition - else: - return cond2obj(pn_disposition_condition(self._impl)) - def _set_condition(self, obj): - if self.local: - self._condition = obj - else: - raise AttributeError("condition attribute is read-only") - condition = property(_get_condition, _set_condition) - -class Delivery(Wrapper): - """ - Tracks and/or records the delivery of a message over a link. - """ - - RECEIVED = Disposition.RECEIVED - ACCEPTED = Disposition.ACCEPTED - REJECTED = Disposition.REJECTED - RELEASED = Disposition.RELEASED - MODIFIED = Disposition.MODIFIED - - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - return Delivery(impl) - - def __init__(self, impl): - Wrapper.__init__(self, impl, pn_delivery_attachments) - - def _init(self): - self.local = Disposition(pn_delivery_local(self._impl), True) - self.remote = Disposition(pn_delivery_remote(self._impl), False) - - @property - def tag(self): - """The identifier for the delivery.""" - return pn_delivery_tag(self._impl) - - @property - def writable(self): - """Returns true for an outgoing delivery to which data can now be written.""" - return pn_delivery_writable(self._impl) - - @property - def readable(self): - """Returns true for an incoming delivery that has data to read.""" - return pn_delivery_readable(self._impl) - - @property - def updated(self): - """Returns true if the state of the delivery has been updated - (e.g. it has been settled and/or accepted, rejected etc).""" - return pn_delivery_updated(self._impl) - - def update(self, state): - """ - Set the local state of the delivery e.g. ACCEPTED, REJECTED, RELEASED. - """ - obj2dat(self.local._data, pn_disposition_data(self.local._impl)) - obj2dat(self.local._annotations, pn_disposition_annotations(self.local._impl)) - obj2cond(self.local._condition, pn_disposition_condition(self.local._impl)) - pn_delivery_update(self._impl, state) - - @property - def pending(self): - return pn_delivery_pending(self._impl) - - @property - def partial(self): - """ - Returns true for an incoming delivery if not all the data is - yet available. - """ - return pn_delivery_partial(self._impl) - - @property - def local_state(self): - """Returns the local state of the delivery.""" - return DispositionType.get(pn_delivery_local_state(self._impl)) - - @property - def remote_state(self): - """ - Returns the state of the delivery as indicated by the remote - peer. - """ - return DispositionType.get(pn_delivery_remote_state(self._impl)) - - @property - def settled(self): - """ - Returns true if the delivery has been settled by the remote peer. - """ - return pn_delivery_settled(self._impl) - - def settle(self): - """ - Settles the delivery locally. This indicates the application - considers the delivery complete and does not wish to receive any - further events about it. Every delivery should be settled locally. - """ - pn_delivery_settle(self._impl) - - @property - def aborted(self): - """Returns true if the delivery has been aborted.""" - return pn_delivery_aborted(self._impl) - - def abort(self): - """ - Aborts the delivery. This indicates the application wishes to - invalidate any data that may have already been sent on this delivery. - The delivery cannot be aborted after it has been completely delivered. - """ - pn_delivery_abort(self._impl) - - @property - def work_next(self): - return Delivery.wrap(pn_work_next(self._impl)) - - @property - def link(self): - """ - Returns the link on which the delivery was sent or received. - """ - return Link.wrap(pn_delivery_link(self._impl)) - - @property - def session(self): - """ - Returns the session over which the delivery was sent or received. - """ - return self.link.session - - @property - def connection(self): - """ - Returns the connection over which the delivery was sent or received. - """ - return self.session.connection - - @property - def transport(self): - return self.connection.transport - -class TransportException(ProtonException): - pass - -class TraceAdapter: - - def __init__(self, tracer): - self.tracer = tracer - - def __call__(self, trans_impl, message): - self.tracer(Transport.wrap(trans_impl), message) - -class Transport(Wrapper): - - TRACE_OFF = PN_TRACE_OFF - TRACE_DRV = PN_TRACE_DRV - TRACE_FRM = PN_TRACE_FRM - TRACE_RAW = PN_TRACE_RAW - - CLIENT = 1 - SERVER = 2 - - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - return Transport(_impl=impl) - - def __init__(self, mode=None, _impl = pn_transport): - Wrapper.__init__(self, _impl, pn_transport_attachments) - if mode == Transport.SERVER: - pn_transport_set_server(self._impl) - elif mode is None or mode==Transport.CLIENT: - pass - else: - raise TransportException("Cannot initialise Transport from mode: %s" % str(mode)) - - def _init(self): - self._sasl = None - self._ssl = None - - def _check(self, err): - if err < 0: - exc = EXCEPTIONS.get(err, TransportException) - raise exc("[%s]: %s" % (err, pn_error_text(pn_transport_error(self._impl)))) - else: - return err - - def _set_tracer(self, tracer): - pn_transport_set_pytracer(self._impl, TraceAdapter(tracer)); - - def _get_tracer(self): - adapter = pn_transport_get_pytracer(self._impl) - if adapter: - return adapter.tracer - else: - return None - - tracer = property(_get_tracer, _set_tracer, - doc=""" -A callback for trace logging. The callback is passed the transport and log message. -""") - - def log(self, message): - pn_transport_log(self._impl, message) - - def require_auth(self, bool): - pn_transport_require_auth(self._impl, bool) - - @property - def authenticated(self): - return pn_transport_is_authenticated(self._impl) - - def require_encryption(self, bool): - pn_transport_require_encryption(self._impl, bool) - - @property - def encrypted(self): - return pn_transport_is_encrypted(self._impl) - - @property - def user(self): - return pn_transport_get_user(self._impl) - - def bind(self, connection): - """Assign a connection to the transport""" - self._check(pn_transport_bind(self._impl, connection._impl)) - - def unbind(self): - """Release the connection""" - self._check(pn_transport_unbind(self._impl)) - - def trace(self, n): - pn_transport_trace(self._impl, n) - - def tick(self, now): - """Process any timed events (like heartbeat generation). - now = seconds since epoch (float). - """ - return millis2secs(pn_transport_tick(self._impl, secs2millis(now))) - - def capacity(self): - c = pn_transport_capacity(self._impl) - if c >= PN_EOS: - return c - else: - return self._check(c) - - def push(self, binary): - n = self._check(pn_transport_push(self._impl, binary)) - if n != len(binary): - raise OverflowError("unable to process all bytes: %s, %s" % (n, len(binary))) - - def close_tail(self): - self._check(pn_transport_close_tail(self._impl)) - - def pending(self): - p = pn_transport_pending(self._impl) - if p >= PN_EOS: - return p - else: - return self._check(p) - - def peek(self, size): - cd, out = pn_transport_peek(self._impl, size) - if cd == PN_EOS: - return None - else: - self._check(cd) - return out - - def pop(self, size): - pn_transport_pop(self._impl, size) - - def close_head(self): - self._check(pn_transport_close_head(self._impl)) - - @property - def closed(self): - return pn_transport_closed(self._impl) - - # AMQP 1.0 max-frame-size - def _get_max_frame_size(self): - return pn_transport_get_max_frame(self._impl) - - def _set_max_frame_size(self, value): - pn_transport_set_max_frame(self._impl, value) - - max_frame_size = property(_get_max_frame_size, _set_max_frame_size, - doc=""" -Sets the maximum size for received frames (in bytes). -""") - - @property - def remote_max_frame_size(self): - return pn_transport_get_remote_max_frame(self._impl) - - def _get_channel_max(self): - return pn_transport_get_channel_max(self._impl) - - def _set_channel_max(self, value): - if pn_transport_set_channel_max(self._impl, value): - raise SessionException("Too late to change channel max.") - - channel_max = property(_get_channel_max, _set_channel_max, - doc=""" -Sets the maximum channel that may be used on the transport. -""") - - @property - def remote_channel_max(self): - return pn_transport_remote_channel_max(self._impl) - - # AMQP 1.0 idle-time-out - def _get_idle_timeout(self): - return millis2secs(pn_transport_get_idle_timeout(self._impl)) - - def _set_idle_timeout(self, sec): - pn_transport_set_idle_timeout(self._impl, secs2millis(sec)) - - idle_timeout = property(_get_idle_timeout, _set_idle_timeout, - doc=""" -The idle timeout of the connection (float, in seconds). -""") - - @property - def remote_idle_timeout(self): - return millis2secs(pn_transport_get_remote_idle_timeout(self._impl)) - - @property - def frames_output(self): - return pn_transport_get_frames_output(self._impl) - - @property - def frames_input(self): - return pn_transport_get_frames_input(self._impl) - - def sasl(self): - return SASL(self) - - def ssl(self, domain=None, session_details=None): - # SSL factory (singleton for this transport) - if not self._ssl: - self._ssl = SSL(self, domain, session_details) - return self._ssl - - @property - def condition(self): - return cond2obj(pn_transport_condition(self._impl)) - - @property - def connection(self): - return Connection.wrap(pn_transport_connection(self._impl)) - -class SASLException(TransportException): - pass - -class SASL(Wrapper): - - OK = PN_SASL_OK - AUTH = PN_SASL_AUTH - SYS = PN_SASL_SYS - PERM = PN_SASL_PERM - TEMP = PN_SASL_TEMP - - @staticmethod - def extended(): - return pn_sasl_extended() - - def __init__(self, transport): - Wrapper.__init__(self, transport._impl, pn_transport_attachments) - self._sasl = pn_sasl(transport._impl) - - def _check(self, err): - if err < 0: - exc = EXCEPTIONS.get(err, SASLException) - raise exc("[%s]" % (err)) - else: - return err - - @property - def user(self): - return pn_sasl_get_user(self._sasl) - - @property - def mech(self): - return pn_sasl_get_mech(self._sasl) - - @property - def outcome(self): - outcome = pn_sasl_outcome(self._sasl) - if outcome == PN_SASL_NONE: - return None - else: - return outcome - - def allowed_mechs(self, mechs): - pn_sasl_allowed_mechs(self._sasl, unicode2utf8(mechs)) - - def _get_allow_insecure_mechs(self): - return pn_sasl_get_allow_insecure_mechs(self._sasl) - - def _set_allow_insecure_mechs(self, insecure): - pn_sasl_set_allow_insecure_mechs(self._sasl, insecure) - - allow_insecure_mechs = property(_get_allow_insecure_mechs, _set_allow_insecure_mechs, - doc=""" -Allow unencrypted cleartext passwords (PLAIN mech) -""") - - def done(self, outcome): - pn_sasl_done(self._sasl, outcome) - - def config_name(self, name): - pn_sasl_config_name(self._sasl, name) - - def config_path(self, path): - pn_sasl_config_path(self._sasl, path) - -class SSLException(TransportException): - pass - -class SSLUnavailable(SSLException): - pass - -class SSLDomain(object): - - MODE_CLIENT = PN_SSL_MODE_CLIENT - MODE_SERVER = PN_SSL_MODE_SERVER - VERIFY_PEER = PN_SSL_VERIFY_PEER - VERIFY_PEER_NAME = PN_SSL_VERIFY_PEER_NAME - ANONYMOUS_PEER = PN_SSL_ANONYMOUS_PEER - - def __init__(self, mode): - self._domain = pn_ssl_domain(mode) - if self._domain is None: - raise SSLUnavailable() - - def _check(self, err): - if err < 0: - exc = EXCEPTIONS.get(err, SSLException) - raise exc("SSL failure.") - else: - return err - - def set_credentials(self, cert_file, key_file, password): - return self._check( pn_ssl_domain_set_credentials(self._domain, - cert_file, key_file, - password) ) - def set_trusted_ca_db(self, certificate_db): - return self._check( pn_ssl_domain_set_trusted_ca_db(self._domain, - certificate_db) ) - def set_peer_authentication(self, verify_mode, trusted_CAs=None): - return self._check( pn_ssl_domain_set_peer_authentication(self._domain, - verify_mode, - trusted_CAs) ) - - def allow_unsecured_client(self): - return self._check( pn_ssl_domain_allow_unsecured_client(self._domain) ) - - def __del__(self): - pn_ssl_domain_free(self._domain) - -class SSL(object): - - @staticmethod - def present(): - return pn_ssl_present() - - def _check(self, err): - if err < 0: - exc = EXCEPTIONS.get(err, SSLException) - raise exc("SSL failure.") - else: - return err - - def __new__(cls, transport, domain, session_details=None): - """Enforce a singleton SSL object per Transport""" - if transport._ssl: - # unfortunately, we've combined the allocation and the configuration in a - # single step. So catch any attempt by the application to provide what - # may be a different configuration than the original (hack) - ssl = transport._ssl - if (domain and (ssl._domain is not domain) or - session_details and (ssl._session_details is not session_details)): - raise SSLException("Cannot re-configure existing SSL object!") - else: - obj = super(SSL, cls).__new__(cls) - obj._domain = domain - obj._session_details = session_details - session_id = None - if session_details: - session_id = session_details.get_session_id() - obj._ssl = pn_ssl( transport._impl ) - if obj._ssl is None: - raise SSLUnavailable() - if domain: - pn_ssl_init( obj._ssl, domain._domain, session_id ) - transport._ssl = obj - return transport._ssl - - def cipher_name(self): - rc, name = pn_ssl_get_cipher_name( self._ssl, 128 ) - if rc: - return name - return None - - def protocol_name(self): - rc, name = pn_ssl_get_protocol_name( self._ssl, 128 ) - if rc: - return name - return None - - SHA1 = PN_SSL_SHA1 - SHA256 = PN_SSL_SHA256 - SHA512 = PN_SSL_SHA512 - MD5 = PN_SSL_MD5 - - CERT_COUNTRY_NAME = PN_SSL_CERT_SUBJECT_COUNTRY_NAME - CERT_STATE_OR_PROVINCE = PN_SSL_CERT_SUBJECT_STATE_OR_PROVINCE - CERT_CITY_OR_LOCALITY = PN_SSL_CERT_SUBJECT_CITY_OR_LOCALITY - CERT_ORGANIZATION_NAME = PN_SSL_CERT_SUBJECT_ORGANIZATION_NAME - CERT_ORGANIZATION_UNIT = PN_SSL_CERT_SUBJECT_ORGANIZATION_UNIT - CERT_COMMON_NAME = PN_SSL_CERT_SUBJECT_COMMON_NAME - - def get_cert_subject_subfield(self, subfield_name): - subfield_value = pn_ssl_get_remote_subject_subfield(self._ssl, subfield_name) - return subfield_value - - def get_cert_subject(self): - subject = pn_ssl_get_remote_subject(self._ssl) - return subject - - def _get_cert_subject_unknown_subfield(self): - # Pass in an unhandled enum - return self.get_cert_subject_subfield(10) - - # Convenience functions for obtaining the subfields of the subject field. - def get_cert_common_name(self): - return self.get_cert_subject_subfield(SSL.CERT_COMMON_NAME) - - def get_cert_organization(self): - return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_NAME) - - def get_cert_organization_unit(self): - return self.get_cert_subject_subfield(SSL.CERT_ORGANIZATION_UNIT) - - def get_cert_locality_or_city(self): - return self.get_cert_subject_subfield(SSL.CERT_CITY_OR_LOCALITY) - - def get_cert_country(self): - return self.get_cert_subject_subfield(SSL.CERT_COUNTRY_NAME) - - def get_cert_state_or_province(self): - return self.get_cert_subject_subfield(SSL.CERT_STATE_OR_PROVINCE) - - def get_cert_fingerprint(self, fingerprint_length, digest_name): - rc, fingerprint_str = pn_ssl_get_cert_fingerprint(self._ssl, fingerprint_length, digest_name) - if rc == PN_OK: - return fingerprint_str - return None - - # Convenience functions for obtaining fingerprint for specific hashing algorithms - def _get_cert_fingerprint_unknown_hash_alg(self): - return self.get_cert_fingerprint(41, 10) - - def get_cert_fingerprint_sha1(self): - return self.get_cert_fingerprint(41, SSL.SHA1) - - def get_cert_fingerprint_sha256(self): - # sha256 produces a fingerprint that is 64 characters long - return self.get_cert_fingerprint(65, SSL.SHA256) - - def get_cert_fingerprint_sha512(self): - # sha512 produces a fingerprint that is 128 characters long - return self.get_cert_fingerprint(129, SSL.SHA512) - - def get_cert_fingerprint_md5(self): - return self.get_cert_fingerprint(33, SSL.MD5) - - @property - def remote_subject(self): - return pn_ssl_get_remote_subject( self._ssl ) - - RESUME_UNKNOWN = PN_SSL_RESUME_UNKNOWN - RESUME_NEW = PN_SSL_RESUME_NEW - RESUME_REUSED = PN_SSL_RESUME_REUSED - - def resume_status(self): - return pn_ssl_resume_status( self._ssl ) - - def _set_peer_hostname(self, hostname): - self._check(pn_ssl_set_peer_hostname( self._ssl, unicode2utf8(hostname) )) - def
<TRUNCATED> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org