split code into utils, handlers and reactors modules
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4706c2b7 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4706c2b7 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4706c2b7 Branch: refs/heads/examples Commit: 4706c2b7c412cbffd7c9667ec2af9b0a437157c3 Parents: 38682d4 Author: Gordon Sim <[email protected]> Authored: Wed Nov 19 19:21:29 2014 +0000 Committer: Gordon Sim <[email protected]> Committed: Wed Nov 19 19:21:29 2014 +0000 ---------------------------------------------------------------------- tutorial/client.py | 3 +- tutorial/client_http.py | 2 +- tutorial/helloworld.py | 3 +- tutorial/helloworld_blocking.py | 3 +- tutorial/helloworld_direct.py | 3 +- tutorial/helloworld_direct_tornado.py | 2 +- tutorial/helloworld_simple.py | 41 - tutorial/helloworld_simplistic.py | 35 - tutorial/helloworld_tornado.py | 2 +- tutorial/proton_events.py | 1241 ---------------------------- tutorial/proton_reactors.py | 724 ++++++++++++++++ tutorial/proton_server.py | 4 +- tutorial/proton_tornado.py | 2 +- tutorial/recurring_timer.py | 2 +- tutorial/recurring_timer_tornado.py | 2 +- tutorial/selected_recv.py | 14 +- tutorial/server.py | 3 +- tutorial/server_tx.py | 3 +- tutorial/simple_recv.py | 3 +- tutorial/simple_send.py | 3 +- tutorial/sync_client.py | 4 +- tutorial/tx_recv.py | 7 +- tutorial/tx_send.py | 10 +- tutorial/tx_send_sync.py | 10 +- 24 files changed, 777 insertions(+), 1349 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/client.py ---------------------------------------------------------------------- diff --git a/tutorial/client.py b/tutorial/client.py index e3b705c..f2bc866 100755 --- a/tutorial/client.py +++ b/tutorial/client.py @@ -19,7 +19,8 @@ # from proton import Message -from proton_events import EventLoop, MessagingHandler +from proton_handlers import MessagingHandler +from proton_reactors import EventLoop class Client(MessagingHandler): def __init__(self, host, address, requests): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/client_http.py ---------------------------------------------------------------------- diff --git a/tutorial/client_http.py b/tutorial/client_http.py index afa0c78..9e7333b 100755 --- a/tutorial/client_http.py +++ b/tutorial/client_http.py @@ -19,7 +19,7 @@ # from proton import Message -from proton_events import MessagingHandler +from proton_handlers import MessagingHandler from proton_tornado import TornadoLoop from tornado.ioloop import IOLoop import tornado.web http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/helloworld.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld.py b/tutorial/helloworld.py index a3965a8..35a93da 100755 --- a/tutorial/helloworld.py +++ b/tutorial/helloworld.py @@ -19,7 +19,8 @@ # from proton import Message -from proton_events import EventLoop, MessagingHandler +from proton_handlers import MessagingHandler +from proton_reactors import EventLoop class HelloWorld(MessagingHandler): def __init__(self, server, address): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/helloworld_blocking.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_blocking.py b/tutorial/helloworld_blocking.py index 73d983d..2c4dca2 100755 --- a/tutorial/helloworld_blocking.py +++ b/tutorial/helloworld_blocking.py @@ -19,7 +19,8 @@ # from proton import Message -from proton_events import BlockingConnection, IncomingMessageHandler +from proton_utils import BlockingConnection +from proton_handlers import IncomingMessageHandler class HelloWorldReceiver(IncomingMessageHandler): def on_message(self, event): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/helloworld_direct.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_direct.py b/tutorial/helloworld_direct.py index fd70c0c..652039b 100755 --- a/tutorial/helloworld_direct.py +++ b/tutorial/helloworld_direct.py @@ -19,7 +19,8 @@ # from proton import Message -from proton_events import EventLoop, MessagingHandler +from proton_handlers import MessagingHandler +from proton_reactors import EventLoop class HelloWorld(MessagingHandler): def __init__(self, server, address): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/helloworld_direct_tornado.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_direct_tornado.py b/tutorial/helloworld_direct_tornado.py index 1982f18..1a8365a 100755 --- a/tutorial/helloworld_direct_tornado.py +++ b/tutorial/helloworld_direct_tornado.py @@ -19,7 +19,7 @@ # from proton import Message -from proton_events import MessagingHandler +from proton_handlers import MessagingHandler from proton_tornado import TornadoLoop class HelloWorld(MessagingHandler): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/helloworld_simple.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_simple.py b/tutorial/helloworld_simple.py deleted file mode 100755 index 35c4dbd..0000000 --- a/tutorial/helloworld_simple.py +++ /dev/null @@ -1,41 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton import Message -import proton_events - -class HelloWorld(proton_events.MessagingHandler): - def __init__(self): - super(HelloWorld, self).__init__() - - def on_credit(self, event): - event.sender.send_msg(Message(body=u"Hello World!")) - event.sender.close() - - def on_message(self, event): - print event.message.body - event.connection.close() - -conn = proton_events.connect("localhost:5672", handler=HelloWorld()) -conn.create_receiver("examples") -conn.create_sender("examples") -proton_events.run() - - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/helloworld_simplistic.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_simplistic.py b/tutorial/helloworld_simplistic.py deleted file mode 100755 index 31874be..0000000 --- a/tutorial/helloworld_simplistic.py +++ /dev/null @@ -1,35 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton import Message -from proton_events import EventLoop, FlowController, IncomingMessageHandler - -class HelloWorldReceiver(IncomingMessageHandler): - def on_message(self, event): - print event.message.body - event.connection.close() - -eventloop = EventLoop(FlowController(1)) -conn = eventloop.connect("localhost:5672") -conn.create_receiver("examples", handler=HelloWorldReceiver()) -sender = conn.create_sender("examples") -sender.send_msg(Message(body=u"Hello World!")); -eventloop.run() - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/helloworld_tornado.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_tornado.py b/tutorial/helloworld_tornado.py index be251ce..541eec4 100755 --- a/tutorial/helloworld_tornado.py +++ b/tutorial/helloworld_tornado.py @@ -19,7 +19,7 @@ # from proton import Message -from proton_events import MessagingHandler +from proton_handlers import MessagingHandler from proton_tornado import TornadoLoop class HelloWorld(MessagingHandler): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/proton_events.py ---------------------------------------------------------------------- diff --git a/tutorial/proton_events.py b/tutorial/proton_events.py deleted file mode 100644 index 51f5423..0000000 --- a/tutorial/proton_events.py +++ /dev/null @@ -1,1241 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# -import heapq, os, Queue, re, socket, time, types -from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url -from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout -from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException -from select import select - -class AmqpConnection(object): - - def __init__(self, conn, sock, events, heartbeat=None): - self.events = events - self.conn = conn - self.transport = Transport() - if heartbeat: self.transport.idle_timeout = heartbeat - self.transport.bind(self.conn) - self.socket = sock - self.socket.setblocking(0) - self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - self.write_done = False - self.read_done = False - self._closed = False - - def accept(self, force_sasl=True): - if force_sasl: - sasl = self.transport.sasl() - sasl.mechanisms("ANONYMOUS") - sasl.server() - sasl.done(SASL.OK) - #TODO: use SASL anyway if requested by peer - return self - - def connect(self, host, port=None, username=None, password=None, force_sasl=True): - if username and password: - sasl = self.transport.sasl() - sasl.plain(username, password) - elif force_sasl: - sasl = self.transport.sasl() - sasl.mechanisms('ANONYMOUS') - sasl.client() - try: - self.socket.connect_ex((host, port or 5672)) - except socket.gaierror, e: - raise ConnectionException("Cannot resolve '%s': %s" % (host, e)) - return self - - def _closed_cleanly(self): - return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & Endpoint.REMOTE_CLOSED - - def closed(self): - if not self._closed and self.write_done and self.read_done: - self.close() - return True - else: - return False - - def close(self): - self.socket.close() - self._closed = True - - def fileno(self): - return self.socket.fileno() - - def reading(self): - if self.read_done: return False - c = self.transport.capacity() - if c > 0: - return True - elif c < 0: - self.read_done = True - return False - - def writing(self): - if self.write_done: return False - try: - p = self.transport.pending() - if p > 0: - return True - elif p < 0: - self.write_done = True - return False - else: # p == 0 - return False - except TransportException, e: - self.write_done = True - return False - - def readable(self): - c = self.transport.capacity() - if c > 0: - try: - data = self.socket.recv(c) - if data: - self.transport.push(data) - else: - if not self._closed_cleanly(): - self.read_done = True - self.write_done = True - else: - self.transport.close_tail() - except TransportException, e: - print "Error on read: %s" % e - self.read_done = True - except socket.error, e: - print "Error on recv: %s" % e - self.read_done = True - self.write_done = True - elif c < 0: - self.read_done = True - - def writable(self): - try: - p = self.transport.pending() - if p > 0: - data = self.transport.peek(p) - n = self.socket.send(data) - self.transport.pop(n) - elif p < 0: - self.write_done = True - except TransportException, e: - print "Error on write: %s" % e - self.write_done = True - except socket.error, e: - print "Error on send: %s" % e - self.write_done = True - - def removed(self): - if not self._closed_cleanly(): - self.transport.unbind() - self.events.dispatch(ApplicationEvent("disconnected", connection=self.conn)) - - def tick(self): - t = self.transport.tick(time.time()) - if t: return t - time.time() - else: return None - -class Acceptor: - - def __init__(self, events, loop, host, port): - self.events = events - self.loop = loop - self.socket = socket.socket() - self.socket.setblocking(0) - self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - self.socket.bind((host, port)) - self.socket.listen(5) - self.loop.add(self) - self._closed = False - - def closed(self): - if self._closed: - self.socket.close() - return True - else: - return False - - def close(self): - self._closed = True - - def fileno(self): - return self.socket.fileno() - - def reading(self): - return not self._closed - - def writing(self): - return False - - def readable(self): - sock, addr = self.socket.accept() - if sock: - self.loop.add(AmqpConnection(self.events.connection(), sock, self.events).accept()) - - def removed(self): pass - def tick(self): return None - -class EventInjector(object): - def __init__(self, events): - self.events = events - self.queue = Queue.Queue() - self.pipe = os.pipe() - self._closed = False - - def trigger(self, event): - self.queue.put(event) - os.write(self.pipe[1], "!") - - def closed(self): - return self._closed and self.queue.empty() - - def close(self): - self._closed = True - - def fileno(self): - return self.pipe[0] - - def reading(self): - return True - - def writing(self): - return False - - def readable(self): - os.read(self.pipe[0], 512) - while not self.queue.empty(): - self.events.dispatch(self.queue.get()) - - def removed(self): pass - def tick(self): return None - -def nested_handlers(handlers): - # currently only allows for a single level of nesting - nested = [] - for h in handlers: - nested.append(h) - if hasattr(h, 'handlers'): - nested.extend(getattr(h, 'handlers')) - return nested - -def add_nested_handler(handler, nested): - if hasattr(handler, 'handlers'): - getattr(handler, 'handlers').append(nested) - else: - handler.handlers = [nested] - -class Events(object): - def __init__(self, *handlers): - self.collector = Collector() - self.handlers = handlers - - def connection(self): - conn = Connection() - conn.collect(self.collector) - return conn - - def process(self): - while True: - ev = self.collector.peek() - if ev: - self.dispatch(ev) - self.collector.pop() - else: - return - - def dispatch(self, event): - for h in self.handlers: - event.dispatch(h) - - @property - def next_interval(self): - return None - - @property - def empty(self): - return self.collector.peek() == None - -class ExtendedEventType(object): - def __init__(self, name): - self.name = name - self.method = "on_%s" % name - -class ApplicationEvent(Event): - def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None): - self.type = ExtendedEventType(typename) - self.subject = subject - if delivery: - self.context = delivery - self.clazz = "pn_delivery" - elif link: - self.context = link - self.clazz = "pn_link" - elif session: - self.context = session - self.clazz = "pn_session" - elif connection: - self.context = connection - self.clazz = "pn_connection" - else: - self.context = None - self.clazz = "none" - - def __repr__(self): - objects = [self.context, self.subject] - return "%s(%s)" % (self.type.name, - ", ".join([str(o) for o in objects if o is not None])) - -class StartEvent(ApplicationEvent): - def __init__(self, reactor): - super(StartEvent, self).__init__("start") - self.reactor = reactor - -class ScheduledEvents(Events): - def __init__(self, *handlers): - super(ScheduledEvents, self).__init__(*handlers) - self._events = [] - - def schedule(self, deadline, event): - heapq.heappush(self._events, (deadline, event)) - - def process(self): - super(ScheduledEvents, self).process() - while self._events and self._events[0][0] <= time.time(): - self.dispatch(heapq.heappop(self._events)[1]) - - @property - def next_interval(self): - if len(self._events): - deadline = self._events[0][0] - now = time.time() - return deadline - now if deadline > now else 0 - else: - return None - - @property - def empty(self): - return super(ScheduledEvents, self).empty and len(self._events) == 0 - -def _min(a, b): - if a and b: return min(a, b) - elif a: return a - else: return b - -class SelectLoop(object): - - def __init__(self, events): - self.events = events - self.selectables = [] - self._abort = False - - def abort(self): - self._abort = True - - def add(self, selectable): - self.selectables.append(selectable) - - def remove(self, selectable): - self.selectables.remove(selectable) - - @property - def redundant(self): - return self.events.empty and not self.selectables - - @property - def aborted(self): - return self._abort - - def run(self): - while not (self._abort or self.redundant): - self.do_work() - - def do_work(self, timeout=None): - """@return True if some work was done, False if time-out expired""" - self.events.process() - if self._abort: return - - stable = False - while not stable: - reading = [] - writing = [] - closed = [] - tick = None - for s in self.selectables: - if s.reading(): reading.append(s) - if s.writing(): writing.append(s) - if s.closed(): closed.append(s) - else: tick = _min(tick, s.tick()) - - for s in closed: - self.selectables.remove(s) - s.removed() - stable = len(closed) == 0 - - if self.redundant: - return - - if timeout and timeout < 0: - timeout = 0 - if self.events.next_interval and (timeout is None or self.events.next_interval < timeout): - timeout = self.events.next_interval - if tick: - timeout = _min(tick, timeout) - if reading or writing or timeout: - readable, writable, _ = select(reading, writing, [], timeout) - for s in self.selectables: - s.tick() - for s in readable: - s.readable() - for s in writable: - s.writable() - - return bool(readable or writable) - else: - return False - - -class Handshaker(Handler): - - def on_connection_remote_open(self, event): - conn = event.connection - if conn.state & Endpoint.LOCAL_UNINIT: - conn.open() - - def on_session_remote_open(self, event): - ssn = event.session - if ssn.state & Endpoint.LOCAL_UNINIT: - ssn.open() - - def on_link_remote_open(self, event): - link = event.link - if link.state & Endpoint.LOCAL_UNINIT: - link.source.copy(link.remote_source) - link.target.copy(link.remote_target) - link.open() - - def on_connection_remote_close(self, event): - conn = event.connection - if not (conn.state & Endpoint.LOCAL_CLOSED): - conn.close() - - def on_session_remote_close(self, event): - ssn = event.session - if not (ssn.state & Endpoint.LOCAL_CLOSED): - ssn.close() - - def on_link_remote_close(self, event): - link = event.link - if not (link.state & Endpoint.LOCAL_CLOSED): - link.close() - -class FlowController(Handler): - - def __init__(self, window=1): - self.window = window - - def top_up(self, link): - delta = self.window - link.credit - link.flow(delta) - - def on_link_local_open(self, event): - if event.link.is_receiver: - self.top_up(event.link) - - def on_link_remote_open(self, event): - if event.link.is_receiver: - self.top_up(event.link) - - def on_link_flow(self, event): - if event.link.is_receiver: - self.top_up(event.link) - - def on_delivery(self, event): - if not event.delivery.released and event.delivery.link.is_receiver: - self.top_up(event.delivery.link) - -class ScopedHandler(Handler): - - scopes = { - "pn_connection": ["connection"], - "pn_session": ["session", "connection"], - "pn_link": ["link", "session", "connection"], - "pn_delivery": ["delivery", "link", "session", "connection"] - } - - def on_unhandled(self, method, args): - event = args[0] - if event.type in [Event.CONNECTION_FINAL, Event.SESSION_FINAL, Event.LINK_FINAL]: - return - objects = [getattr(event, attr) for attr in self.scopes.get(event.clazz, [])] - targets = [getattr(o, "context") for o in objects if hasattr(o, "context")] - handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)] - for h in handlers: - h(event) - -class OutgoingMessageHandler(Handler): - def __init__(self, auto_settle=True, delegate=None): - self.auto_settle = auto_settle - self.delegate = delegate - - def on_link_flow(self, event): - if event.link.is_sender and event.link.credit: - self.on_credit(event) - - def on_delivery(self, event): - dlv = event.delivery - if dlv.released: return - if dlv.link.is_sender and dlv.updated: - if dlv.remote_state == Delivery.ACCEPTED: - self.on_accepted(event) - elif dlv.remote_state == Delivery.REJECTED: - self.on_rejected(event) - elif dlv.remote_state == Delivery.RELEASED: - self.on_released(event) - elif dlv.remote_state == Delivery.MODIFIED: - self.on_modified(event) - if dlv.settled: - self.on_settled(event) - if self.auto_settle: - dlv.settle() - - def on_credit(self, event): - if self.delegate: - dispatch(self.delegate, 'on_credit', event) - - def on_accepted(self, event): - if self.delegate: - dispatch(self.delegate, 'on_accepted', event) - - def on_rejected(self, event): - if self.delegate: - dispatch(self.delegate, 'on_rejected', event) - - def on_released(self, event): - if self.delegate: - dispatch(self.delegate, 'on_released', event) - - def on_modified(self, event): - if self.delegate: - dispatch(self.delegate, 'on_modified', event) - - def on_settled(self, event): - if self.delegate: - dispatch(self.delegate, 'on_settled', event) - -def recv_msg(delivery): - msg = Message() - msg.decode(delivery.link.recv(delivery.pending)) - delivery.link.advance() - return msg - -class Reject(ProtonException): - """ - An exception that indicate a message should be rejected - """ - pass - -class Acking(object): - def accept(self, delivery): - self.settle(delivery, Delivery.ACCEPTED) - - def reject(self, delivery): - self.settle(delivery, Delivery.REJECTED) - - def release(self, delivery, delivered=True): - if delivered: - self.settle(delivery, Delivery.MODIFIED) - else: - self.settle(delivery, Delivery.RELEASED) - - def settle(self, delivery, state=None): - if state: - delivery.update(state) - delivery.settle() - -class IncomingMessageHandler(Handler, Acking): - def __init__(self, auto_accept=True, delegate=None): - self.delegate = delegate - self.auto_accept = auto_accept - - def on_delivery(self, event): - dlv = event.delivery - if dlv.released or not dlv.link.is_receiver: return - if dlv.readable and not dlv.partial: - event.message = recv_msg(dlv) - try: - self.on_message(event) - if self.auto_accept: - dlv.update(Delivery.ACCEPTED) - dlv.settle() - except Reject: - dlv.update(Delivery.REJECTED) - dlv.settle() - elif dlv.updated and dlv.settled: - self.on_settled(event) - - def on_message(self, event): - if self.delegate: - dispatch(self.delegate, 'on_message', event) - - def on_settled(self, event): - if self.delegate: - dispatch(self.delegate, 'on_settled', event) - -class EndpointStateHandler(Handler): - def __init__(self, peer_close_is_error=False, delegate=None): - self.delegate = delegate - self.peer_close_is_error = peer_close_is_error - - def is_local_open(self, endpoint): - return endpoint.state & Endpoint.LOCAL_ACTIVE - - def is_local_uninitialised(self, endpoint): - return endpoint.state & Endpoint.LOCAL_UNINIT - - def is_local_closed(self, endpoint): - return endpoint.state & Endpoint.LOCAL_CLOSED - - def is_remote_open(self, endpoint): - return endpoint.state & Endpoint.REMOTE_ACTIVE - - def is_remote_closed(self, endpoint): - return endpoint.state & Endpoint.REMOTE_CLOSED - - def print_error(self, endpoint, endpoint_type): - if endpoint.remote_condition: - print endpoint.remote_condition.description - elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint): - print "%s closed by peer" % endpoint_type - - def on_link_remote_close(self, event): - if event.link.remote_condition: - self.on_link_error(event) - elif self.is_local_closed(event.link): - self.on_link_closed(event) - else: - self.on_link_closing(event) - event.link.close() - - def on_session_remote_close(self, event): - if event.session.remote_condition: - self.on_session_error(event) - elif self.is_local_closed(event.session): - self.on_session_closed(event) - else: - self.on_session_closing(event) - event.session.close() - - def on_connection_remote_close(self, event): - if event.connection.remote_condition: - self.on_connection_error(event) - elif self.is_local_closed(event.connection): - self.on_connection_closed(event) - else: - self.on_connection_closing(event) - event.connection.close() - - def on_connection_local_open(self, event): - if self.is_remote_open(event.connection): - self.on_connection_opened(event) - - def on_connection_remote_open(self, event): - if self.is_local_open(event.connection): - self.on_connection_opened(event) - elif self.is_local_uninitialised(event.connection): - self.on_connection_opening(event) - event.connection.open() - - def on_session_local_open(self, event): - if self.is_remote_open(event.session): - self.on_session_opened(event) - - def on_session_remote_open(self, event): - if self.is_local_open(event.session): - self.on_session_opened(event) - elif self.is_local_uninitialised(event.session): - self.on_session_opening(event) - event.session.open() - - def on_link_local_open(self, event): - if self.is_remote_open(event.link): - self.on_link_opened(event) - - def on_link_remote_open(self, event): - if self.is_local_open(event.link): - self.on_link_opened(event) - elif self.is_local_uninitialised(event.link): - self.on_link_opening(event) - event.link.open() - - def on_connection_opened(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_opened', event) - - def on_session_opened(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_opened', event) - - def on_link_opened(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_opened', event) - - def on_connection_opening(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_opening', event) - - def on_session_opening(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_opening', event) - - def on_link_opening(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_opening', event) - - def on_connection_error(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_error', event) - else: - self.print_error(event.connection, "connection") - - def on_session_error(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_error', event) - else: - self.print_error(event.session, "session") - event.connection.close() - - def on_link_error(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_error', event) - else: - self.print_error(event.link, "link") - event.connection.close() - - def on_connection_closed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_closed', event) - - def on_session_closed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_closed', event) - - def on_link_closed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_closed', event) - - def on_connection_closing(self, event): - if self.delegate: - dispatch(self.delegate, 'on_connection_closing', event) - elif self.peer_close_is_error: - self.on_connection_error(event) - - def on_session_closing(self, event): - if self.delegate: - dispatch(self.delegate, 'on_session_closing', event) - elif self.peer_close_is_error: - self.on_session_error(event) - - def on_link_closing(self, event): - if self.delegate: - dispatch(self.delegate, 'on_link_closing', event) - elif self.peer_close_is_error: - self.on_link_error(event) - -class MessagingHandler(Handler, Acking): - def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False): - self.handlers = [] - # FlowController if used needs to see event before - # IncomingMessageHandler, as the latter may involve the - # delivery being released - if prefetch: - self.handlers.append(FlowController(prefetch)) - self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) - self.handlers.append(IncomingMessageHandler(auto_accept, self)) - self.handlers.append(OutgoingMessageHandler(auto_settle, self)) - -def delivery_tags(): - count = 1 - while True: - yield str(count) - count += 1 - -def send_msg(sender, msg, tag=None, handler=None, transaction=None): - dlv = sender.delivery(tag or next(sender.tags)) - if transaction: - dlv.local.data = [transaction.id] - dlv.update(0x34) - if handler: - dlv.context = handler - sender.send(msg.encode()) - sender.advance() - return dlv - -def _send_msg(self, msg, tag=None, handler=None, transaction=None): - return send_msg(self, msg, tag, handler, transaction) - -class TransactionalAcking(object): - def accept(self, delivery, transaction): - self.settle(delivery, transaction, PN_ACCEPTED) - - def settle(self, delivery, transaction, state=None): - if state: - delivery.local.data = [transaction.id, Described(ulong(state), [])] - delivery.update(0x34) - delivery.settle() - -class TransactionHandler(OutgoingMessageHandler, TransactionalAcking): - def __init__(self, auto_settle=True, delegate=None): - super(TransactionHandler, self).__init__(auto_settle, delegate) - - def on_settled(self, event): - if hasattr(event.delivery, "transaction"): - event.transaction = event.delivery.transaction - event.delivery.transaction.handle_outcome(event) - - def on_transaction_declared(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_declared', event) - - def on_transaction_committed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_committed', event) - - def on_transaction_aborted(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_aborted', event) - - def on_transaction_declare_failed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_declare_failed', event) - - def on_transaction_commit_failed(self, event): - if self.delegate: - dispatch(self.delegate, 'on_transaction_commit_failed', event) - -class TransactionalClientHandler(Handler, TransactionalAcking): - def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False): - super(TransactionalClientHandler, self).__init__() - self.handlers = [] - # FlowController if used needs to see event before - # IncomingMessageHandler, as the latter may involve the - # delivery being released - if prefetch: - self.handlers.append(FlowController(prefetch)) - self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) - self.handlers.append(IncomingMessageHandler(auto_accept, self)) - self.handlers.append(TransactionHandler(auto_settle, self)) - - -class Transaction(object): - def __init__(self, txn_ctrl, handler): - self.txn_ctrl = txn_ctrl - self.handler = handler - self.id = None - self._declare = None - self._discharge = None - self.failed = False - self.declare() - - def commit(self): - self.discharge(False) - - def abort(self): - self.discharge(True) - - def declare(self): - self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None]) - - def discharge(self, failed): - self.failed = failed - self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed]) - - def _send_ctrl(self, descriptor, value): - delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.handler) - delivery.transaction = self - return delivery - - def handle_outcome(self, event): - if event.delivery == self._declare: - if event.delivery.remote.data: - self.id = event.delivery.remote.data[0] - self.handler.on_transaction_declared(event) - elif event.delivery.remote_state == Delivery.REJECTED: - self.handler.on_transaction_declare_failed(event) - else: - print "Unexpected outcome for declare: %s" % event.delivery.remote_state - self.handler.on_transaction_declare_failed(event) - elif event.delivery == self._discharge: - if event.delivery.remote_state == Delivery.REJECTED: - if not self.failed: - self.handler.on_transaction_commit_failed(event) - else: - if self.failed: - self.handler.on_transaction_aborted(event) - else: - self.handler.on_transaction_committed(event) - - -class LinkOption(object): - def apply(self, link): pass - def test(self, link): return True - -class AtMostOnce(LinkOption): - def apply(self, link): - link.snd_settle_mode = Link.SND_SETTLED - -class AtLeastOnce(LinkOption): - def apply(self, link): - link.snd_settle_mode = Link.SND_UNSETTLED - link.rcv_settle_mode = Link.RCV_FIRST - -class SenderOption(LinkOption): - def apply(self, sender): pass - def test(self, link): return link.is_sender - -class ReceiverOption(LinkOption): - def apply(self, receiver): pass - def test(self, link): return link.is_receiver - -class Filter(ReceiverOption): - def __init__(self, filter_set={}): - self.filter_set = filter_set - - def apply(self, receiver): - receiver.source.filter.put_dict(self.filter_set) - -class Selector(Filter): - def __init__(self, value, name='selector'): - super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)}) - -def _apply_link_options(options, link): - if options: - if isinstance(options, list): - for o in options: - if o.test(link): o.apply(link) - else: - if options.test(link): options.apply(link) - - -class MessagingContext(object): - def __init__(self, conn, handler=None, ssn=None): - self.conn = conn - if handler: - self.conn.context = handler - self.conn._mc = self - self.ssn = ssn - self.txn_ctrl = None - - def _get_handler(self): - return self.conn.context - - def _set_handler(self, value): - self.conn.context = value - - handler = property(_get_handler, _set_handler) - - def create_sender(self, target, source=None, name=None, handler=None, tags=None, options=None): - snd = self._get_ssn().sender(name or self._get_id(target, source)) - if source: - snd.source.address = source - if target: - snd.target.address = target - if handler: - snd.context = handler - snd.tags = tags or delivery_tags() - snd.send_msg = types.MethodType(_send_msg, snd) - _apply_link_options(options, snd) - snd.open() - return snd - - def create_receiver(self, source, target=None, name=None, dynamic=False, handler=None, options=None): - rcv = self._get_ssn().receiver(name or self._get_id(source, target)) - if source: - rcv.source.address = source - if dynamic: - rcv.source.dynamic = True - if target: - rcv.target.address = target - if handler: - rcv.context = handler - _apply_link_options(options, rcv) - rcv.open() - return rcv - - def create_session(self): - return MessageContext(conn=None, ssn=self._new_ssn()) - - def declare_transaction(self, handler=None): - if not self.txn_ctrl: - self.txn_ctrl = self.create_sender(None, name="txn-ctrl") - self.txn_ctrl.target.type = Terminus.COORDINATOR - self.txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) - return Transaction(self.txn_ctrl, handler) - - def close(self): - if self.ssn: - self.ssn.close() - if self.conn: - self.conn.close() - - def _get_id(self, remote, local): - if local and remote: "%s-%s-%s" % (self.conn.container, remote, local) - elif local: return "%s-%s" % (self.conn.container, local) - elif remote: return "%s-%s" % (self.conn.container, remote) - else: return "%s-%s" % (self.conn.container, str(generate_uuid())) - - def _get_ssn(self): - if not self.ssn: - self.ssn = self._new_ssn() - self.ssn.context = self - return self.ssn - - def _new_ssn(self): - ssn = self.conn.session() - ssn.open() - return ssn - - def on_session_remote_close(self, event): - if self.conn: - self.conn.close() - -class Connector(Handler): - def attach_to(self, loop): - self.loop = loop - - def _connect(self, connection): - host, port = connection.address.next() - #print "connecting to %s:%i" % (host, port) - heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat') else None - self.loop.add(AmqpConnection(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port)) - - def on_connection_local_open(self, event): - if hasattr(event.connection, "address"): - self._connect(event.connection) - - def on_connection_remote_open(self, event): - if hasattr(event.connection, "reconnect"): - event.connection.reconnect.reset() - - def on_disconnected(self, event): - if hasattr(event.connection, "reconnect"): - delay = event.connection.reconnect.next() - if delay == 0: - print "Disconnected, reconnecting..." - self._connect(event.connection) - else: - print "Disconnected will try to reconnect after %s seconds" % delay - self.loop.schedule(time.time() + delay, connection=event.connection, subject=self) - else: - print "Disconnected" - - def on_timer(self, event): - if event.subject == self and event.connection: - self._connect(event.connection) - -class Backoff(object): - def __init__(self): - self.delay = 0 - - def reset(self): - self.delay = 0 - - def next(self): - current = self.delay - if current == 0: - self.delay = 0.1 - else: - self.delay = min(10, 2*current) - return current - -class Urls(object): - def __init__(self, values): - self.values = [Url(v) for v in values] - self.i = iter(self.values) - - def __iter__(self): - return self - - def _as_pair(self, url): - return (url.host, url.port) - - def next(self): - try: - return self._as_pair(self.i.next()) - except StopIteration: - self.i = iter(self.values) - return self._as_pair(self.i.next()) - -class EventLoop(object): - def __init__(self, *handlers): - self.connector = Connector() - h = [self.connector, ScopedHandler()] - h.extend(nested_handlers(handlers)) - self.events = ScheduledEvents(*h) - self.loop = SelectLoop(self.events) - self.connector.attach_to(self) - self.trigger = None - self.container_id = str(generate_uuid()) - - def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None): - context = MessagingContext(self.events.connection(), handler=handler) - context.conn.container = self.container_id or str(generate_uuid()) - context.conn.heartbeat = heartbeat - if url: context.conn.address = Urls([url]) - elif urls: context.conn.address = Urls(urls) - elif address: context.conn.address = address - else: raise ValueError("One of url, urls or address required") - if reconnect: - context.conn.reconnect = reconnect - elif reconnect is None: - context.conn.reconnect = Backoff() - context.conn.open() - return context - - def listen(self, url): - host, port = Url(url).next() - return Acceptor(self.events, self, host, port) - - def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None): - self.events.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject)) - - def get_event_trigger(self): - if not self.trigger or self.trigger.closed(): - self.trigger = EventInjector(self.events) - self.add(self.trigger) - return self.trigger - - def add(self, selectable): - self.loop.add(selectable) - - def remove(self, selectable): - self.loop.remove(selectable) - - def run(self): - self.events.dispatch(StartEvent(self)) - self.loop.run() - - def stop(self): - self.loop.abort() - - def do_work(self, timeout=None): - return self.loop.do_work(timeout) - -EventLoop.DEFAULT = EventLoop() - -def connect(url=None, urls=None, address=None, handler=None, reconnect=None, eventloop=None, heartbeat=None): - if not eventloop: - eventloop = EventLoop.DEFAULT - return eventloop.connect(url=url, urls=urls, address=address, handler=handler, reconnect=reconnect, heartbeat=heartbeat) - -def run(eventloop=None): - if not eventloop: - eventloop = EventLoop.DEFAULT - eventloop.run() - -class BlockingLink(object): - def __init__(self, connection, link): - self.connection = connection - self.link = link - self.connection.wait(lambda: not (self.link.state & Endpoint.REMOTE_UNINIT), - msg="Opening link %s" % link.name) - - def close(self): - self.connection.wait(not (self.link.state & Endpoint.REMOTE_ACTIVE), - msg="Closing link %s" % link.name) - - # Access to other link attributes. - def __getattr__(self, name): return getattr(self.link, name) - -class BlockingSender(BlockingLink): - def __init__(self, connection, sender): - super(BlockingSender, self).__init__(connection, sender) - - def send_msg(self, msg): - delivery = send_msg(self.link, msg) - self.connection.wait(lambda: delivery.settled, msg="Sending on sender %s" % self.link.name) - -class BlockingReceiver(BlockingLink): - def __init__(self, connection, receiver, credit=1): - super(BlockingReceiver, self).__init__(connection, receiver) - if credit: receiver.flow(credit) - -class BlockingConnection(Handler): - def __init__(self, url, timeout=None): - self.timeout = timeout - self.events = Events(ScopedHandler()) - self.loop = SelectLoop(self.events) - self.context = MessagingContext(self.loop.events.connection(), handler=self) - if isinstance(url, basestring): - self.url = Url(url) - else: - self.url = url - self.loop.add( - AmqpConnection(self.context.conn, socket.socket(), self.events).connect(self.url.host, self.url.port)) - self.context.conn.open() - self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_UNINIT), - msg="Opening connection") - - def create_sender(self, address, handler=None): - return BlockingSender(self, self.context.create_sender(address, handler=handler)) - - def create_receiver(self, address, credit=1, dynamic=False, handler=None): - return BlockingReceiver( - self, self.context.create_receiver(address, dynamic=dynamic, handler=handler), credit=credit) - - def close(self): - self.context.conn.close() - self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_ACTIVE), - msg="Closing connection") - - def run(self): - """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ - self.loop.run() - - def wait(self, condition, timeout=False, msg=None): - """Call do_work until condition() is true""" - if timeout is False: - timeout = self.timeout - if timeout is None: - while not condition(): - self.loop.do_work() - else: - deadline = time.time() + timeout - while not condition(): - if not self.loop.do_work(deadline - time.time()): - txt = "Connection %s timed out" % self.url - if msg: txt += ": " + msg - raise Timeout(txt) - - def on_link_remote_close(self, event): - if event.link.state & Endpoint.LOCAL_ACTIVE: - self.closed(event.link.remote_condition) - - def on_connection_remote_close(self, event): - if event.connection.state & Endpoint.LOCAL_ACTIVE: - self.closed(event.connection.remote_condition) - - def on_disconnected(self, event): - raise ConnectionException("Connection %s disconnected" % self.url); - - def closed(self, error=None): - txt = "Connection %s closed" % self.url - if error: - txt += " due to: %s" % error - else: - txt += " by peer" - raise ConnectionException(txt) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/proton_reactors.py ---------------------------------------------------------------------- diff --git a/tutorial/proton_reactors.py b/tutorial/proton_reactors.py new file mode 100644 index 0000000..28f287d --- /dev/null +++ b/tutorial/proton_reactors.py @@ -0,0 +1,724 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import heapq, os, Queue, socket, time, types +from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url +from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout +from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException +from select import select +from proton_handlers import nested_handlers, ScopedHandler + + +class AmqpSocket(object): + + def __init__(self, conn, sock, events, heartbeat=None): + self.events = events + self.conn = conn + self.transport = Transport() + if heartbeat: self.transport.idle_timeout = heartbeat + self.transport.bind(self.conn) + self.socket = sock + self.socket.setblocking(0) + self.socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.write_done = False + self.read_done = False + self._closed = False + + def accept(self, force_sasl=True): + if force_sasl: + sasl = self.transport.sasl() + sasl.mechanisms("ANONYMOUS") + sasl.server() + sasl.done(SASL.OK) + #TODO: use SASL anyway if requested by peer + return self + + def connect(self, host, port=None, username=None, password=None, force_sasl=True): + if username and password: + sasl = self.transport.sasl() + sasl.plain(username, password) + elif force_sasl: + sasl = self.transport.sasl() + sasl.mechanisms('ANONYMOUS') + sasl.client() + try: + self.socket.connect_ex((host, port or 5672)) + except socket.gaierror, e: + raise ConnectionException("Cannot resolve '%s': %s" % (host, e)) + return self + + def _closed_cleanly(self): + return self.conn.state & Endpoint.LOCAL_CLOSED and self.conn.state & Endpoint.REMOTE_CLOSED + + def closed(self): + if not self._closed and self.write_done and self.read_done: + self.close() + return True + else: + return False + + def close(self): + self.socket.close() + self._closed = True + + def fileno(self): + return self.socket.fileno() + + def reading(self): + if self.read_done: return False + c = self.transport.capacity() + if c > 0: + return True + elif c < 0: + self.read_done = True + return False + + def writing(self): + if self.write_done: return False + try: + p = self.transport.pending() + if p > 0: + return True + elif p < 0: + self.write_done = True + return False + else: # p == 0 + return False + except TransportException, e: + self.write_done = True + return False + + def readable(self): + c = self.transport.capacity() + if c > 0: + try: + data = self.socket.recv(c) + if data: + self.transport.push(data) + else: + if not self._closed_cleanly(): + self.read_done = True + self.write_done = True + else: + self.transport.close_tail() + except TransportException, e: + print "Error on read: %s" % e + self.read_done = True + except socket.error, e: + print "Error on recv: %s" % e + self.read_done = True + self.write_done = True + elif c < 0: + self.read_done = True + + def writable(self): + try: + p = self.transport.pending() + if p > 0: + data = self.transport.peek(p) + n = self.socket.send(data) + self.transport.pop(n) + elif p < 0: + self.write_done = True + except TransportException, e: + print "Error on write: %s" % e + self.write_done = True + except socket.error, e: + print "Error on send: %s" % e + self.write_done = True + + def removed(self): + if not self._closed_cleanly(): + self.transport.unbind() + self.events.dispatch(ApplicationEvent("disconnected", connection=self.conn)) + + def tick(self): + t = self.transport.tick(time.time()) + if t: return t - time.time() + else: return None + +class Acceptor: + + def __init__(self, events, loop, host, port): + self.events = events + self.loop = loop + self.socket = socket.socket() + self.socket.setblocking(0) + self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.socket.bind((host, port)) + self.socket.listen(5) + self.loop.add(self) + self._closed = False + + def closed(self): + if self._closed: + self.socket.close() + return True + else: + return False + + def close(self): + self._closed = True + + def fileno(self): + return self.socket.fileno() + + def reading(self): + return not self._closed + + def writing(self): + return False + + def readable(self): + sock, addr = self.socket.accept() + if sock: + self.loop.add(AmqpSocket(self.events.connection(), sock, self.events).accept()) + + def removed(self): pass + def tick(self): return None + +class EventInjector(object): + def __init__(self, events): + self.events = events + self.queue = Queue.Queue() + self.pipe = os.pipe() + self._closed = False + + def trigger(self, event): + self.queue.put(event) + os.write(self.pipe[1], "!") + + def closed(self): + return self._closed and self.queue.empty() + + def close(self): + self._closed = True + + def fileno(self): + return self.pipe[0] + + def reading(self): + return True + + def writing(self): + return False + + def readable(self): + os.read(self.pipe[0], 512) + while not self.queue.empty(): + self.events.dispatch(self.queue.get()) + + def removed(self): pass + def tick(self): return None + +class Events(object): + def __init__(self, *handlers): + self.collector = Collector() + self.handlers = handlers + + def connection(self): + conn = Connection() + conn.collect(self.collector) + return conn + + def process(self): + while True: + ev = self.collector.peek() + if ev: + self.dispatch(ev) + self.collector.pop() + else: + return + + def dispatch(self, event): + for h in self.handlers: + event.dispatch(h) + + @property + def next_interval(self): + return None + + @property + def empty(self): + return self.collector.peek() == None + +class ExtendedEventType(object): + def __init__(self, name): + self.name = name + self.method = "on_%s" % name + +class ApplicationEvent(Event): + def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None): + self.type = ExtendedEventType(typename) + self.subject = subject + if delivery: + self.context = delivery + self.clazz = "pn_delivery" + elif link: + self.context = link + self.clazz = "pn_link" + elif session: + self.context = session + self.clazz = "pn_session" + elif connection: + self.context = connection + self.clazz = "pn_connection" + else: + self.context = None + self.clazz = "none" + + def __repr__(self): + objects = [self.context, self.subject] + return "%s(%s)" % (self.type.name, + ", ".join([str(o) for o in objects if o is not None])) + +class StartEvent(ApplicationEvent): + def __init__(self, reactor): + super(StartEvent, self).__init__("start") + self.reactor = reactor + +class ScheduledEvents(Events): + def __init__(self, *handlers): + super(ScheduledEvents, self).__init__(*handlers) + self._events = [] + + def schedule(self, deadline, event): + heapq.heappush(self._events, (deadline, event)) + + def process(self): + super(ScheduledEvents, self).process() + while self._events and self._events[0][0] <= time.time(): + self.dispatch(heapq.heappop(self._events)[1]) + + @property + def next_interval(self): + if len(self._events): + deadline = self._events[0][0] + now = time.time() + return deadline - now if deadline > now else 0 + else: + return None + + @property + def empty(self): + return super(ScheduledEvents, self).empty and len(self._events) == 0 + +def _min(a, b): + if a and b: return min(a, b) + elif a: return a + else: return b + +class SelectLoop(object): + + def __init__(self, events): + self.events = events + self.selectables = [] + self._abort = False + + def abort(self): + self._abort = True + + def add(self, selectable): + self.selectables.append(selectable) + + def remove(self, selectable): + self.selectables.remove(selectable) + + @property + def redundant(self): + return self.events.empty and not self.selectables + + @property + def aborted(self): + return self._abort + + def run(self): + while not (self._abort or self.redundant): + self.do_work() + + def do_work(self, timeout=None): + """@return True if some work was done, False if time-out expired""" + self.events.process() + if self._abort: return + + stable = False + while not stable: + reading = [] + writing = [] + closed = [] + tick = None + for s in self.selectables: + if s.reading(): reading.append(s) + if s.writing(): writing.append(s) + if s.closed(): closed.append(s) + else: tick = _min(tick, s.tick()) + + for s in closed: + self.selectables.remove(s) + s.removed() + stable = len(closed) == 0 + + if self.redundant: + return + + if timeout and timeout < 0: + timeout = 0 + if self.events.next_interval and (timeout is None or self.events.next_interval < timeout): + timeout = self.events.next_interval + if tick: + timeout = _min(tick, timeout) + if reading or writing or timeout: + readable, writable, _ = select(reading, writing, [], timeout) + for s in self.selectables: + s.tick() + for s in readable: + s.readable() + for s in writable: + s.writable() + + return bool(readable or writable) + else: + return False + +def delivery_tags(): + count = 1 + while True: + yield str(count) + count += 1 + +def send_msg(sender, msg, tag=None, handler=None, transaction=None): + dlv = sender.delivery(tag or next(sender.tags)) + if transaction: + dlv.local.data = [transaction.id] + dlv.update(0x34) + if handler: + dlv.context = handler + sender.send(msg.encode()) + sender.advance() + return dlv + +def _send_msg(self, msg, tag=None, handler=None, transaction=None): + return send_msg(self, msg, tag, handler, transaction) + + +class Transaction(object): + def __init__(self, txn_ctrl, handler): + self.txn_ctrl = txn_ctrl + self.handler = handler + self.id = None + self._declare = None + self._discharge = None + self.failed = False + self.declare() + + def commit(self): + self.discharge(False) + + def abort(self): + self.discharge(True) + + def declare(self): + self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None]) + + def discharge(self, failed): + self.failed = failed + self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed]) + + def _send_ctrl(self, descriptor, value): + delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.handler) + delivery.transaction = self + return delivery + + def handle_outcome(self, event): + if event.delivery == self._declare: + if event.delivery.remote.data: + self.id = event.delivery.remote.data[0] + self.handler.on_transaction_declared(event) + elif event.delivery.remote_state == Delivery.REJECTED: + self.handler.on_transaction_declare_failed(event) + else: + print "Unexpected outcome for declare: %s" % event.delivery.remote_state + self.handler.on_transaction_declare_failed(event) + elif event.delivery == self._discharge: + if event.delivery.remote_state == Delivery.REJECTED: + if not self.failed: + self.handler.on_transaction_commit_failed(event) + else: + if self.failed: + self.handler.on_transaction_aborted(event) + else: + self.handler.on_transaction_committed(event) + + +class LinkOption(object): + def apply(self, link): pass + def test(self, link): return True + +class AtMostOnce(LinkOption): + def apply(self, link): + link.snd_settle_mode = Link.SND_SETTLED + +class AtLeastOnce(LinkOption): + def apply(self, link): + link.snd_settle_mode = Link.SND_UNSETTLED + link.rcv_settle_mode = Link.RCV_FIRST + +class SenderOption(LinkOption): + def apply(self, sender): pass + def test(self, link): return link.is_sender + +class ReceiverOption(LinkOption): + def apply(self, receiver): pass + def test(self, link): return link.is_receiver + +class Filter(ReceiverOption): + def __init__(self, filter_set={}): + self.filter_set = filter_set + + def apply(self, receiver): + receiver.source.filter.put_dict(self.filter_set) + +class Selector(Filter): + def __init__(self, value, name='selector'): + super(Selector, self).__init__({symbol(name): Described(symbol('apache.org:selector-filter:string'), value)}) + +def _apply_link_options(options, link): + if options: + if isinstance(options, list): + for o in options: + if o.test(link): o.apply(link) + else: + if options.test(link): options.apply(link) + + +class MessagingContext(object): + def __init__(self, conn, handler=None, ssn=None): + self.conn = conn + if handler: + self.conn.context = handler + self.conn._mc = self + self.ssn = ssn + self.txn_ctrl = None + + def _get_handler(self): + return self.conn.context + + def _set_handler(self, value): + self.conn.context = value + + handler = property(_get_handler, _set_handler) + + def create_sender(self, target, source=None, name=None, handler=None, tags=None, options=None): + snd = self._get_ssn().sender(name or self._get_id(target, source)) + if source: + snd.source.address = source + if target: + snd.target.address = target + if handler: + snd.context = handler + snd.tags = tags or delivery_tags() + snd.send_msg = types.MethodType(_send_msg, snd) + _apply_link_options(options, snd) + snd.open() + return snd + + def create_receiver(self, source, target=None, name=None, dynamic=False, handler=None, options=None): + rcv = self._get_ssn().receiver(name or self._get_id(source, target)) + if source: + rcv.source.address = source + if dynamic: + rcv.source.dynamic = True + if target: + rcv.target.address = target + if handler: + rcv.context = handler + _apply_link_options(options, rcv) + rcv.open() + return rcv + + def create_session(self): + return MessageContext(conn=None, ssn=self._new_ssn()) + + def declare_transaction(self, handler=None): + if not self.txn_ctrl: + self.txn_ctrl = self.create_sender(None, name="txn-ctrl") + self.txn_ctrl.target.type = Terminus.COORDINATOR + self.txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) + return Transaction(self.txn_ctrl, handler) + + def close(self): + if self.ssn: + self.ssn.close() + if self.conn: + self.conn.close() + + def _get_id(self, remote, local): + if local and remote: "%s-%s-%s" % (self.conn.container, remote, local) + elif local: return "%s-%s" % (self.conn.container, local) + elif remote: return "%s-%s" % (self.conn.container, remote) + else: return "%s-%s" % (self.conn.container, str(generate_uuid())) + + def _get_ssn(self): + if not self.ssn: + self.ssn = self._new_ssn() + self.ssn.context = self + return self.ssn + + def _new_ssn(self): + ssn = self.conn.session() + ssn.open() + return ssn + + def on_session_remote_close(self, event): + if self.conn: + self.conn.close() + +class Connector(Handler): + def attach_to(self, loop): + self.loop = loop + + def _connect(self, connection): + host, port = connection.address.next() + #print "connecting to %s:%i" % (host, port) + heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat') else None + self.loop.add(AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port)) + + def on_connection_local_open(self, event): + if hasattr(event.connection, "address"): + self._connect(event.connection) + + def on_connection_remote_open(self, event): + if hasattr(event.connection, "reconnect"): + event.connection.reconnect.reset() + + def on_disconnected(self, event): + if hasattr(event.connection, "reconnect"): + delay = event.connection.reconnect.next() + if delay == 0: + print "Disconnected, reconnecting..." + self._connect(event.connection) + else: + print "Disconnected will try to reconnect after %s seconds" % delay + self.loop.schedule(time.time() + delay, connection=event.connection, subject=self) + else: + print "Disconnected" + + def on_timer(self, event): + if event.subject == self and event.connection: + self._connect(event.connection) + +class Backoff(object): + def __init__(self): + self.delay = 0 + + def reset(self): + self.delay = 0 + + def next(self): + current = self.delay + if current == 0: + self.delay = 0.1 + else: + self.delay = min(10, 2*current) + return current + +class Urls(object): + def __init__(self, values): + self.values = [Url(v) for v in values] + self.i = iter(self.values) + + def __iter__(self): + return self + + def _as_pair(self, url): + return (url.host, url.port) + + def next(self): + try: + return self._as_pair(self.i.next()) + except StopIteration: + self.i = iter(self.values) + return self._as_pair(self.i.next()) + +class EventLoop(object): + def __init__(self, *handlers): + self.connector = Connector() + h = [self.connector, ScopedHandler()] + h.extend(nested_handlers(handlers)) + self.events = ScheduledEvents(*h) + self.loop = SelectLoop(self.events) + self.connector.attach_to(self) + self.trigger = None + self.container_id = str(generate_uuid()) + + def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None): + context = MessagingContext(self.events.connection(), handler=handler) + context.conn.container = self.container_id or str(generate_uuid()) + context.conn.heartbeat = heartbeat + if url: context.conn.address = Urls([url]) + elif urls: context.conn.address = Urls(urls) + elif address: context.conn.address = address + else: raise ValueError("One of url, urls or address required") + if reconnect: + context.conn.reconnect = reconnect + elif reconnect is None: + context.conn.reconnect = Backoff() + context.conn.open() + return context + + def listen(self, url): + host, port = Urls([url]).next() + return Acceptor(self.events, self, host, port) + + def schedule(self, deadline, connection=None, session=None, link=None, delivery=None, subject=None): + self.events.schedule(deadline, ApplicationEvent("timer", connection, session, link, delivery, subject)) + + def get_event_trigger(self): + if not self.trigger or self.trigger.closed(): + self.trigger = EventInjector(self.events) + self.add(self.trigger) + return self.trigger + + def add(self, selectable): + self.loop.add(selectable) + + def remove(self, selectable): + self.loop.remove(selectable) + + def run(self): + self.events.dispatch(StartEvent(self)) + self.loop.run() + + def stop(self): + self.loop.abort() + + def do_work(self, timeout=None): + return self.loop.do_work(timeout) + +EventLoop.DEFAULT = EventLoop() + +def connect(url=None, urls=None, address=None, handler=None, reconnect=None, eventloop=None, heartbeat=None): + if not eventloop: + eventloop = EventLoop.DEFAULT + return eventloop.connect(url=url, urls=urls, address=address, handler=handler, reconnect=reconnect, heartbeat=heartbeat) + +def run(eventloop=None): + if not eventloop: + eventloop = EventLoop.DEFAULT + eventloop.run() + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/proton_server.py ---------------------------------------------------------------------- diff --git a/tutorial/proton_server.py b/tutorial/proton_server.py index 04f0eb7..3ebf366 100644 --- a/tutorial/proton_server.py +++ b/tutorial/proton_server.py @@ -18,10 +18,12 @@ # from proton import Message -from proton_events import EventLoop, FlowController, IncomingMessageHandler +from proton_reactors import EventLoop +from proton_handlers import FlowController, IncomingMessageHandler class Server(IncomingMessageHandler): def __init__(self, host, address): + super(Server, self).__init__() self.eventloop = EventLoop(self, FlowController(10)) self.conn = self.eventloop.connect(host) self.receiver = self.conn.create_receiver(address) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/proton_tornado.py ---------------------------------------------------------------------- diff --git a/tutorial/proton_tornado.py b/tutorial/proton_tornado.py index 0275169..a5ec82c 100644 --- a/tutorial/proton_tornado.py +++ b/tutorial/proton_tornado.py @@ -18,7 +18,7 @@ # under the License. # -from proton_events import ApplicationEvent, EventLoop, StartEvent +from proton_reactors import ApplicationEvent, EventLoop, StartEvent import tornado.ioloop class TornadoLoop(EventLoop): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/recurring_timer.py ---------------------------------------------------------------------- diff --git a/tutorial/recurring_timer.py b/tutorial/recurring_timer.py index c4fed72..2bd7b9f 100755 --- a/tutorial/recurring_timer.py +++ b/tutorial/recurring_timer.py @@ -19,7 +19,7 @@ # import time -from proton_events import EventLoop, Handler +from proton_reactors import EventLoop, Handler class Recurring(Handler): def __init__(self, period): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/recurring_timer_tornado.py ---------------------------------------------------------------------- diff --git a/tutorial/recurring_timer_tornado.py b/tutorial/recurring_timer_tornado.py index d94e559..f4ca260 100755 --- a/tutorial/recurring_timer_tornado.py +++ b/tutorial/recurring_timer_tornado.py @@ -19,7 +19,7 @@ # import time -from proton_events import Handler +from proton import Handler from proton_tornado import TornadoLoop class Recurring(Handler): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/selected_recv.py ---------------------------------------------------------------------- diff --git a/tutorial/selected_recv.py b/tutorial/selected_recv.py index c6d1570..b31862a 100755 --- a/tutorial/selected_recv.py +++ b/tutorial/selected_recv.py @@ -18,16 +18,20 @@ # under the License. # -import proton_events +import proton_reactors +from proton_handlers import MessagingHandler + +class Recv(MessagingHandler): + def __init__(self): + super(Recv, self).__init__() -class Recv(proton_events.ClientHandler): def on_message(self, event): print event.message.body try: - conn = proton_events.connect("localhost:5672", handler=Recv()) - conn.create_receiver("examples", options=proton_events.Selector(u"colour = 'green'")) - proton_events.run() + conn = proton_reactors.connect("localhost:5672", handler=Recv()) + conn.create_receiver("examples", options=proton_reactors.Selector(u"colour = 'green'")) + proton_reactors.run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/server.py ---------------------------------------------------------------------- diff --git a/tutorial/server.py b/tutorial/server.py index c426fc3..3969688 100755 --- a/tutorial/server.py +++ b/tutorial/server.py @@ -19,7 +19,8 @@ # from proton import Message -from proton_events import EventLoop, MessagingHandler +from proton_handlers import MessagingHandler +from proton_reactors import EventLoop class Server(MessagingHandler): def __init__(self, host, address): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/server_tx.py ---------------------------------------------------------------------- diff --git a/tutorial/server_tx.py b/tutorial/server_tx.py index c366d08..179c1c8 100755 --- a/tutorial/server_tx.py +++ b/tutorial/server_tx.py @@ -19,7 +19,8 @@ # from proton import Message -from proton_events import EventLoop, MessagingHandler, TransactionHandler +from proton_reactors import EventLoop +from proton_handlers import MessagingHandler, TransactionHandler class TxRequest(TransactionHandler): def __init__(self, response, sender, request_delivery, context): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/simple_recv.py ---------------------------------------------------------------------- diff --git a/tutorial/simple_recv.py b/tutorial/simple_recv.py index 7b58553..aac549c 100755 --- a/tutorial/simple_recv.py +++ b/tutorial/simple_recv.py @@ -18,7 +18,8 @@ # under the License. # -from proton_events import EventLoop, MessagingHandler +from proton_handlers import MessagingHandler +from proton_reactors import EventLoop class Recv(MessagingHandler): def __init__(self, host, address): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/simple_send.py ---------------------------------------------------------------------- diff --git a/tutorial/simple_send.py b/tutorial/simple_send.py index 7c86718..3af5cff 100755 --- a/tutorial/simple_send.py +++ b/tutorial/simple_send.py @@ -19,7 +19,8 @@ # from proton import Message -from proton_events import EventLoop, MessagingHandler +from proton_handlers import MessagingHandler +from proton_reactors import EventLoop class Send(MessagingHandler): def __init__(self, host, address, messages): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/sync_client.py ---------------------------------------------------------------------- diff --git a/tutorial/sync_client.py b/tutorial/sync_client.py index fae351c..eb79fc5 100755 --- a/tutorial/sync_client.py +++ b/tutorial/sync_client.py @@ -25,7 +25,8 @@ Demonstrates the client side of the synchronous request-response pattern """ from proton import Message, Url, ConnectionException, Timeout -from proton_events import BlockingConnection, IncomingMessageHandler +from proton_utils import BlockingConnection +from proton_handlers import IncomingMessageHandler import sys class SyncRequestClient(IncomingMessageHandler): @@ -39,6 +40,7 @@ class SyncRequestClient(IncomingMessageHandler): @param url: a proton.Url or a URL string of the form 'host:port/path' host:port is used to connect, path is used to identify the remote messaging endpoint. """ + super(SyncRequestClient, self).__init__() self.connection = BlockingConnection(Url(url).defaults(), timeout=timeout) self.sender = self.connection.create_sender(url.path) # dynamic=true generates a unique address dynamically for this receiver. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/tx_recv.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_recv.py b/tutorial/tx_recv.py index 7d87c94..5f14fd8 100755 --- a/tutorial/tx_recv.py +++ b/tutorial/tx_recv.py @@ -18,14 +18,15 @@ # under the License. # -import proton_events +from proton_reactors import EventLoop +from proton_handlers import TransactionalClientHandler -class TxRecv(proton_events.TransactionalClientHandler): +class TxRecv(TransactionalClientHandler): def __init__(self, batch_size): super(TxRecv, self).__init__(prefetch=0) self.current_batch = 0 self.batch_size = batch_size - self.event_loop = proton_events.EventLoop(self) + self.event_loop = EventLoop(self) self.conn = self.event_loop.connect("localhost:5672") self.receiver = self.conn.create_receiver("examples") self.conn.declare_transaction(handler=self) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/tx_send.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_send.py b/tutorial/tx_send.py index 7fc3951..44c78bf 100755 --- a/tutorial/tx_send.py +++ b/tutorial/tx_send.py @@ -19,9 +19,10 @@ # from proton import Message -import proton_events +from proton_reactors import EventLoop +from proton_handlers import TransactionalClientHandler -class TxSend(proton_events.TransactionalClientHandler): +class TxSend(TransactionalClientHandler): def __init__(self, messages, batch_size): super(TxSend, self).__init__() self.current_batch = 0 @@ -29,7 +30,8 @@ class TxSend(proton_events.TransactionalClientHandler): self.confirmed = 0 self.total = messages self.batch_size = batch_size - self.conn = proton_events.connect("localhost:5672", handler=self) + self.eventloop = EventLoop() + self.conn = self.eventloop.connect("localhost:5672", handler=self) self.sender = self.conn.create_sender("examples") self.conn.declare_transaction(handler=self) self.transaction = None @@ -67,7 +69,7 @@ class TxSend(proton_events.TransactionalClientHandler): self.current_batch = 0 def run(self): - proton_events.run() + self.eventloop.run() try: TxSend(10000, 10).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4706c2b7/tutorial/tx_send_sync.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_send_sync.py b/tutorial/tx_send_sync.py index 66679f9..6e4d4d8 100755 --- a/tutorial/tx_send_sync.py +++ b/tutorial/tx_send_sync.py @@ -19,9 +19,10 @@ # from proton import Message -import proton_events +from proton_reactors import EventLoop +from proton_handlers import TransactionalClientHandler -class TxSend(proton_events.TransactionalClientHandler): +class TxSend(TransactionalClientHandler): def __init__(self, messages, batch_size): super(TxSend, self).__init__() self.current_batch = 0 @@ -29,7 +30,8 @@ class TxSend(proton_events.TransactionalClientHandler): self.committed = 0 self.total = messages self.batch_size = batch_size - self.conn = proton_events.connect("localhost:5672", handler=self) + self.eventloop = EventLoop() + self.conn = self.eventloop.connect("localhost:5672", handler=self) self.sender = self.conn.create_sender("examples") self.conn.declare_transaction(handler=self) self.transaction = None @@ -68,7 +70,7 @@ class TxSend(proton_events.TransactionalClientHandler): self.current_batch = 0 def run(self): - proton_events.run() + self.eventloop.run() try: TxSend(10000, 10).run() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
