Author: gsim Date: Wed Oct 1 17:37:09 2014 New Revision: 1628781 URL: http://svn.apache.org/r1628781 Log: Some simplifications
Modified: qpid/proton/branches/examples/tutorial/db_recv.py qpid/proton/branches/examples/tutorial/db_send.py qpid/proton/branches/examples/tutorial/helloworld.py qpid/proton/branches/examples/tutorial/helloworld_alt.py qpid/proton/branches/examples/tutorial/helloworld_direct.py qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py qpid/proton/branches/examples/tutorial/helloworld_simple.py qpid/proton/branches/examples/tutorial/proton_events.py qpid/proton/branches/examples/tutorial/simple_recv.py qpid/proton/branches/examples/tutorial/simple_send.py Modified: qpid/proton/branches/examples/tutorial/db_recv.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_recv.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/db_recv.py (original) +++ qpid/proton/branches/examples/tutorial/db_recv.py Wed Oct 1 17:37:09 2014 @@ -18,23 +18,20 @@ # under the License. # -import time -from proton_events import ApplicationEvent, IncomingMessageHandler, EventLoop, FlowController +from proton_events import ApplicationEvent, BaseHandler, EventLoop from db_common import Db -class Recv(IncomingMessageHandler): +class Recv(BaseHandler): def __init__(self, host, address): - self.eventloop = EventLoop()#self, FlowController(10)) + self.eventloop = EventLoop() self.host = host self.address = address self.delay = 0 self.db = Db("dst_db", self.eventloop.get_event_trigger()) # TODO: load last tag from db self.last_id = None - self.connect() - - def connect(self): self.conn = self.eventloop.connect(self.host, handler=self) + self.conn.receiver(self.address) def auto_accept(self): return False @@ -50,35 +47,6 @@ class Recv(IncomingMessageHandler): else: self.accept(event.delivery) - def on_connection_remote_open(self, event): - self.delay = 0 - self.conn.receiver(self.address) - - def on_link_remote_close(self, event): - self.closed(event.link.remote_condition) - - def on_connection_remote_close(self, event): - self.closed(event.connection.remote_condition) - - def closed(self, error=None): - if error: - print "Closed due to %s" % error - self.conn.close() - - def on_disconnected(self, conn): - if self.delay == 0: - self.delay = 0.1 - print "Disconnected, reconnecting..." - self.connect() - else: - print "Disconnected will try to reconnect after %d seconds" % self.delay - self.eventloop.schedule(time.time() + self.delay, connection=conn) - self.delay = min(10, 2*self.delay) - - def on_timer(self, event): - print "Reconnecting..." - self.connect() - def run(self): self.eventloop.run() Modified: qpid/proton/branches/examples/tutorial/db_send.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/db_send.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/db_send.py (original) +++ qpid/proton/branches/examples/tutorial/db_send.py Wed Oct 1 17:37:09 2014 @@ -21,10 +21,10 @@ import Queue import time from proton import Message -from proton_events import ApplicationEvent, EventLoop, OutgoingMessageHandler +from proton_events import ApplicationEvent, BaseHandler, EventLoop from db_common import Db -class Send(OutgoingMessageHandler): +class Send(BaseHandler): def __init__(self, host, address): self.eventloop = EventLoop() self.address = address @@ -33,10 +33,8 @@ class Send(OutgoingMessageHandler): self.sent = 0 self.records = Queue.Queue(maxsize=50) self.db = Db("src_db", self.eventloop.get_event_trigger()) - self.connect() - - def connect(self): self.conn = self.eventloop.connect(self.host, handler=self) + self.sender = self.conn.sender(self.address) def on_records_loaded(self, event): if self.records.empty() and event.subject == self.sent: @@ -50,7 +48,7 @@ class Send(OutgoingMessageHandler): if not self.records.full(): self.db.load(self.records, event=ApplicationEvent("records_loaded", link=self.sender, subject=self.sent)) - def on_link_flow(self, event): + def on_credit(self, event): self.send() def send(self): @@ -67,37 +65,11 @@ class Send(OutgoingMessageHandler): self.db.delete(id) print "settled message %s" % id - def on_connection_remote_open(self, event): + def on_disconnected(self, event): self.db.reset() - self.sender = self.conn.sender(self.address) - self.delay = 0 - - def on_link_remote_close(self, event): - self.closed(event.link.remote_condition) - - def on_connection_remote_close(self, event): - self.closed(event.connection.remote_condition) - - def closed(self, error=None): - if error: - print "Closed due to %s" % error - self.conn.close() - - def on_disconnected(self, conn): - if self.delay == 0: - self.delay = 0.1 - print "Disconnected, reconnecting..." - self.connect() - else: - print "Disconnected will try to reconnect after %d seconds" % self.delay - self.eventloop.schedule(time.time() + self.delay, connection=conn, subject="reconnect") - self.delay = min(10, 2*self.delay) def on_timer(self, event): - if event.subject == "reconnect": - print "Reconnecting..." - self.connect() - elif event.subject == "data": + if event.subject == "data": print "Rechecking for data..." self.request_records() Modified: qpid/proton/branches/examples/tutorial/helloworld.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/helloworld.py (original) +++ qpid/proton/branches/examples/tutorial/helloworld.py Wed Oct 1 17:37:09 2014 @@ -19,44 +19,30 @@ # from proton import Message -from proton_events import EventLoop, IncomingMessageHandler +from proton_events import ErrorHandler, EventLoop, IncomingMessageHandler, OutgoingMessageHandler class HelloWorldReceiver(IncomingMessageHandler): def on_message(self, event): print event.message.body event.connection.close() -class HelloWorldSender(object): - def on_link_flow(self, event): +class HelloWorldSender(OutgoingMessageHandler): + def on_credit(self, event): event.link.send_msg(Message(body=u"Hello World!")) event.link.close() -class HelloWorld(object): - def __init__(self, eventloop, url, address): - self.eventloop = eventloop - self.conn = eventloop.connect(url, handler=self) +class HelloWorld(ErrorHandler): + def __init__(self, url, address): + self.eventloop = EventLoop() + self.conn = self.eventloop.connect(url, handler=self) self.address = address def on_connection_remote_open(self, event): self.conn.receiver(self.address, handler=HelloWorldReceiver()) - - def on_link_remote_open(self, event): - if event.link.is_receiver: - self.conn.sender(self.address, handler=HelloWorldSender()) - - def on_link_remote_close(self, event): - self.closed(event.link.remote_condition) - - def on_connection_remote_close(self, event): - self.closed(event.connection.remote_condition) - - def closed(self, error=None): - if error: - print "Closed due to %s" % error - self.conn.close() + self.conn.sender(self.address, handler=HelloWorldSender()) def run(self): self.eventloop.run() -HelloWorld(EventLoop(), "localhost:5672", "examples").run() +HelloWorld("localhost:5672", "examples").run() Modified: qpid/proton/branches/examples/tutorial/helloworld_alt.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_alt.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/helloworld_alt.py (original) +++ qpid/proton/branches/examples/tutorial/helloworld_alt.py Wed Oct 1 17:37:09 2014 @@ -19,19 +19,18 @@ # from proton import Message -from proton_events import EventLoop, IncomingMessageHandler +import proton_events -class HelloWorld(IncomingMessageHandler): - def __init__(self, eventloop, url, address): - self.eventloop = eventloop - self.conn = eventloop.connect(url, handler=self) +class HelloWorld(proton_events.BaseHandler): + def __init__(self, conn, address): + self.conn = conn self.address = address def on_connection_remote_open(self, event): self.conn.receiver(self.address) self.conn.sender(self.address) - def on_link_flow(self, event): + def on_credit(self, event): event.link.send_msg(Message(body=u"Hello World!")) event.link.close() @@ -39,19 +38,7 @@ class HelloWorld(IncomingMessageHandler) print event.message.body event.connection.close() - def on_link_remote_close(self, event): - self.closed(event.link.remote_condition) - - def on_connection_remote_close(self, event): - self.closed(event.connection.remote_condition) - - def closed(self, error=None): - if error: - print "Closed due to %s" % error - self.conn.close() - - def run(self): - self.eventloop.run() - -HelloWorld(EventLoop(), "localhost:5672", "examples").run() +conn = proton_events.connect("localhost:5672") +conn.handler=HelloWorld(conn, "examples") +proton_events.run() Modified: qpid/proton/branches/examples/tutorial/helloworld_direct.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_direct.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/helloworld_direct.py (original) +++ qpid/proton/branches/examples/tutorial/helloworld_direct.py Wed Oct 1 17:37:09 2014 @@ -19,19 +19,19 @@ # from proton import Message -from proton_events import EventLoop, FlowController, Handshaker, IncomingMessageHandler +from proton_events import ErrorHandler, EventLoop, FlowController, Handshaker, IncomingMessageHandler, OutgoingMessageHandler class HelloWorldReceiver(IncomingMessageHandler): def on_message(self, event): print event.message.body event.connection.close() -class HelloWorldSender(object): - def on_link_flow(self, event): +class HelloWorldSender(OutgoingMessageHandler): + def on_credit(self, event): event.link.send_msg(Message(body=u"Hello World!")) event.link.close() -class HelloWorld(object): +class HelloWorld(ErrorHandler): def __init__(self, eventloop, url, address): self.eventloop = eventloop self.acceptor = eventloop.listen(url) @@ -41,15 +41,7 @@ class HelloWorld(object): def on_connection_remote_open(self, event): self.conn.sender(self.address, handler=HelloWorldSender()) - def on_link_remote_close(self, event): - self.closed(event.link.remote_condition) - def on_connection_remote_close(self, event): - self.closed(event.connection.remote_condition) - - def closed(self, error=None): - if error: - print "Closed due to %s" % error self.conn.close() self.acceptor.close() Modified: qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py (original) +++ qpid/proton/branches/examples/tutorial/helloworld_direct_alt.py Wed Oct 1 17:37:09 2014 @@ -19,14 +19,14 @@ # from proton import Message -from proton_events import EventLoop, FlowController, Handshaker, IncomingMessageHandler +from proton_events import ErrorHandler, EventLoop, FlowController, Handshaker, IncomingMessageHandler, OutgoingMessageHandler class HelloWorldReceiver(IncomingMessageHandler): def on_message(self, event): print event.message.body event.connection.close() -class HelloWorld(object): +class HelloWorld(ErrorHandler, OutgoingMessageHandler): def __init__(self, eventloop, url, address): self.eventloop = eventloop self.acceptor = eventloop.listen(url) @@ -40,15 +40,7 @@ class HelloWorld(object): event.link.send_msg(Message(body=u"Hello World!")) event.link.close() - def on_link_remote_close(self, event): - self.closed(event.link.remote_condition) - def on_connection_remote_close(self, event): - self.closed(event.connection.remote_condition) - - def closed(self, error=None): - if error: - print "Closed due to %s" % error self.conn.close() self.acceptor.close() Modified: qpid/proton/branches/examples/tutorial/helloworld_simple.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/helloworld_simple.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/helloworld_simple.py (original) +++ qpid/proton/branches/examples/tutorial/helloworld_simple.py Wed Oct 1 17:37:09 2014 @@ -19,39 +19,20 @@ # from proton import Message -from proton_events import EventLoop, IncomingMessageHandler +import proton_events -class HelloWorldReceiver(IncomingMessageHandler): - def on_message(self, event): - print event.message.body - event.connection.close() - -class HelloWorldSender(object): - def on_link_flow(self, event): +class HelloWorld(proton_events.BaseHandler): + def on_credit(self, event): event.link.send_msg(Message(body=u"Hello World!")) event.link.close() -class HelloWorld(object): - def __init__(self, eventloop, url, address): - self.eventloop = eventloop - self.conn = eventloop.connect(url, handler=self) - self.address = address - self.conn.receiver(self.address, handler=HelloWorldReceiver()) - self.conn.sender(self.address, handler=HelloWorldSender()) - - def on_link_remote_close(self, event): - self.closed(event.link.remote_condition) - - def on_connection_remote_close(self, event): - self.closed(event.connection.remote_condition) - - def closed(self, error=None): - if error: - print "Closed due to %s" % error - self.conn.close() + def on_message(self, event): + print event.message.body + event.connection.close() - def run(self): - self.eventloop.run() +conn = proton_events.connect("localhost:5672", handler=HelloWorld()) +conn.receiver("examples") +conn.sender("examples") +proton_events.run() -HelloWorld(EventLoop(), "localhost:5672", "examples").run() Modified: qpid/proton/branches/examples/tutorial/proton_events.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/proton_events.py (original) +++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Oct 1 17:37:09 2014 @@ -404,14 +404,17 @@ class SelectLoop(object): timeout = 0 if self.events.next_interval and (timeout is None or self.events.next_interval < timeout): timeout = self.events.next_interval - readable, writable, _ = select(reading, writing, [], timeout) + if reading or writing or timeout: + readable, writable, _ = select(reading, writing, [], timeout) - for s in readable: - s.readable() - for s in writable: - s.writable() + for s in readable: + s.readable() + for s in writable: + s.writable() - return bool(readable or writable) + return bool(readable or writable) + else: + return False class Handshaker(EventDispatcher): @@ -490,11 +493,54 @@ class ScopedDispatcher(EventDispatcher): for h in handlers: h(event) +class ErrorHandler(EventDispatcher): + def was_closed_by_peer(self, endpoint): + return endpoint.state & Endpoint.LOCAL_ACTIVE and endpoint.state & Endpoint.REMOTE_CLOSED + + def treat_as_error(self, endpoint): + return endpoint.remote_condition or self.was_closed_by_peer(endpoint) + + def print_error(self, endpoint, endpoint_type): + if endpoint.remote_condition: + print endpoint.remote_condition.description + elif self.was_closed_by_peer(endpoint): + print "%s closed by peer" % endpoint_type + + def on_link_remote_close(self, event): + if self.treat_as_error(event.link): + self.on_link_error(event) + + def on_session_remote_close(self, event): + if self.treat_as_error(event.session): + self.on_session_error(event) + + def on_connection_remote_close(self, event): + if self.treat_as_error(event.connection): + self.on_connection_error(event) + + def on_connection_error(self, event): + self.print_error(event.connection, "connection") + event.connection.close() + + def on_session_error(self, event): + self.print_error(event.session, "session") + event.session.close() + event.connection.close() + + def on_link_error(self, event): + self.print_error(event.link, "link") + event.link.close() + event.connection.close() + class OutgoingMessageHandler(EventDispatcher): + def on_link_flow(self, event): + if event.link.is_sender and event.link.credit: + self.on_credit(event) + def on_delivery(self, event): dlv = event.delivery link = dlv.link - if dlv.updated and not hasattr(dlv, "_been_settled"): + if link.is_sender and dlv.updated and not hasattr(dlv, "_been_settled"): if dlv.remote_state == Delivery.ACCEPTED: self.on_accepted(event) elif dlv.remote_state == Delivery.REJECTED: @@ -509,6 +555,7 @@ class OutgoingMessageHandler(EventDispat dlv._been_settled = True dlv.settle() + def on_credit(self, event): pass def on_accepted(self, event): pass def on_rejected(self, event): pass def on_released(self, event): pass @@ -566,6 +613,14 @@ class IncomingMessageHandler(EventDispat def on_settled(self, event): pass def auto_accept(self): return True +class BaseHandler(ErrorHandler, IncomingMessageHandler, OutgoingMessageHandler): + def __init__(self): + super(BaseHandler, self).__init__() + + def on_delivery(self, event): + IncomingMessageHandler.on_delivery(self, event) + OutgoingMessageHandler.on_delivery(self, event) + def delivery_tags(): count = 1 while True: @@ -591,6 +646,14 @@ class MessagingContext(object): self.conn._mc = self self.ssn = ssn + def _get_handler(self): + return self.conn.context + + def _set_handler(self, value): + self.conn.context = value + + handler = property(_get_handler, _set_handler) + def sender(self, target, source=None, name=None, handler=None, tags=None): snd = self._get_ssn().sender(name or self._get_id(target, source)) if source: @@ -670,6 +733,8 @@ class Connector(EventDispatcher): else: print "Disconnected will try to reconnect after %s seconds" % delay self.loop.schedule(time.time() + delay, connection=event.connection, subject=self) + else: + print "Disconnected" def on_timer(self, event): if event.subject == self and event.connection: @@ -781,6 +846,8 @@ class EventLoop(object): else: raise ValueError("One of url, urls or address required") if reconnect: context.conn.reconnect = reconnect + elif reconnect is None: + context.conn.reconnect = Backoff() context.conn.open() return context @@ -812,6 +879,18 @@ class EventLoop(object): def do_work(self, timeout=None): return self.loop.do_work(timeout) +EventLoop.DEFAULT = EventLoop() + +def connect(url=None, urls=None, address=None, handler=None, reconnect=None, eventloop=None): + if not eventloop: + eventloop = EventLoop.DEFAULT + return eventloop.connect(url=url, urls=urls, address=address, handler=handler, reconnect=reconnect) + +def run(eventloop=None): + if not eventloop: + eventloop = EventLoop.DEFAULT + eventloop.run() + class BlockingLink(object): def __init__(self, connection, link): self.connection = connection Modified: qpid/proton/branches/examples/tutorial/simple_recv.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_recv.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/simple_recv.py (original) +++ qpid/proton/branches/examples/tutorial/simple_recv.py Wed Oct 1 17:37:09 2014 @@ -18,34 +18,16 @@ # under the License. # -import time -from proton_events import Backoff, EventLoop, IncomingMessageHandler - -class Recv(IncomingMessageHandler): - def __init__(self, eventloop, host, address): - self.eventloop = eventloop - self.conn = self.eventloop.connect(host, handler=self, reconnect=Backoff()) - self.conn.receiver(address) +import proton_events +class Recv(proton_events.BaseHandler): def on_message(self, event): print event.message.body - def on_link_remote_close(self, event): - self.closed(event.link.remote_condition) - - def on_connection_remote_close(self, event): - self.closed(event.connection.remote_condition) - - def closed(self, error=None): - if error: - print "Closed due to %s" % error - self.conn.close() - - def run(self): - self.eventloop.run() - try: - Recv(EventLoop(), "localhost:5672", "examples").run() + conn = proton_events.connect("localhost:5672", handler=Recv()) + conn.receiver("examples") + proton_events.run() except KeyboardInterrupt: pass Modified: qpid/proton/branches/examples/tutorial/simple_send.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/simple_send.py?rev=1628781&r1=1628780&r2=1628781&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/simple_send.py (original) +++ qpid/proton/branches/examples/tutorial/simple_send.py Wed Oct 1 17:37:09 2014 @@ -18,54 +18,32 @@ # under the License. # -import time from proton import Message -from proton_events import Backoff, EventLoop, OutgoingMessageHandler +import proton_events -class Send(OutgoingMessageHandler): - def __init__(self, eventloop, host, address, messages): - self.eventloop = eventloop +class Send(proton_events.BaseHandler): + def __init__(self, messages): self.sent = 0 self.confirmed = 0 self.total = messages - self.conn = self.eventloop.connect(host, handler=self, reconnect=Backoff()) - self.sender = self.conn.sender(address) - def on_link_flow(self, event): - for i in range(self.sender.credit): - if self.sent == self.total: - self.sender.drained() - break - msg = Message(body={'sequence':self.sent}) - self.sender.send_msg(msg, handler=self) + def on_credit(self, event): + while event.link.credit and self.sent < self.total: + msg = Message(body={'sequence':(self.sent+1)}) + event.link.send_msg(msg) self.sent += 1 def on_accepted(self, event): - """ - Stop the application once all of the messages are sent and acknowledged, - """ self.confirmed += 1 if self.confirmed == self.total: - self.sender.close() - self.conn.close() + print "all messages confirmed" + event.connection.close() - def on_connection_remote_open(self, event): + def on_disconnected(self, event): self.sent = self.confirmed - self.sender.offered(self.total - self.sent) - - def on_link_remote_close(self, event): - self.closed(event.link.remote_condition) - - def on_connection_remote_close(self, event): - self.closed(event.connection.remote_condition) - - def closed(self, error=None): - if error: - print "Closed due to %s" % error - self.conn.close() - - def run(self): - self.eventloop.run() - -Send(EventLoop(), "localhost:5672", "examples", 10000).run() +try: + conn = proton_events.connect("localhost:5672", handler=Send(10000)) + conn.sender("examples") + proton_events.run() +except KeyboardInterrupt: pass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org