Repository: qpid-proton Updated Branches: refs/heads/examples 1c4d9ed89 -> 9c87b3db5
Remove MessagingContext, rename EventLoop as Container, move link creation to container, allow single step link creation if desired. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9c87b3db Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9c87b3db Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9c87b3db Branch: refs/heads/examples Commit: 9c87b3db5f27f9aeaa631e9e856fb6908d66f229 Parents: 1c4d9ed Author: Gordon Sim <g...@redhat.com> Authored: Wed Dec 3 12:06:07 2014 +0000 Committer: Gordon Sim <g...@redhat.com> Committed: Thu Dec 4 11:55:10 2014 +0000 ---------------------------------------------------------------------- examples/engine/py/client.py | 10 +- examples/engine/py/client_http.py | 6 +- examples/engine/py/db_recv.py | 14 +- examples/engine/py/db_send.py | 18 +- examples/engine/py/helloworld.py | 10 +- examples/engine/py/helloworld_direct.py | 14 +- examples/engine/py/helloworld_direct_tornado.py | 8 +- examples/engine/py/helloworld_tornado.py | 8 +- examples/engine/py/proton_server.py | 18 +- examples/engine/py/proton_tornado.py | 4 +- examples/engine/py/recurring_timer.py | 22 +- examples/engine/py/recurring_timer_tornado.py | 22 +- examples/engine/py/selected_recv.py | 8 +- examples/engine/py/server.py | 13 +- examples/engine/py/server_tx.py | 18 +- examples/engine/py/simple_recv.py | 12 +- examples/engine/py/simple_send.py | 12 +- examples/engine/py/tx_recv.py | 19 +- examples/engine/py/tx_recv_interactive.py | 16 +- examples/engine/py/tx_send.py | 19 +- examples/engine/py/tx_send_sync.py | 21 +- proton-c/bindings/python/proton/reactors.py | 216 +++++++++---------- proton-c/bindings/python/proton/utils.py | 30 ++- 23 files changed, 254 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/client.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/client.py b/examples/engine/py/client.py index a649dec..d1e2706 100755 --- a/examples/engine/py/client.py +++ b/examples/engine/py/client.py @@ -20,7 +20,7 @@ from proton import Message from proton.handlers import MessagingHandler -from proton.reactors import EventLoop +from proton.reactors import Container class Client(MessagingHandler): def __init__(self, host, address, requests): @@ -30,9 +30,9 @@ class Client(MessagingHandler): self.requests = requests def on_start(self, event): - self.conn = event.reactor.connect(self.host) - self.sender = self.conn.create_sender(self.address) - self.receiver = self.conn.create_receiver(None, dynamic=True) + self.conn = event.container.connect(self.host) + self.sender = event.container.create_sender(self.conn, self.address) + self.receiver = event.container.create_receiver(self.conn, None, dynamic=True) def next_request(self): if self.receiver.remote_source.address: @@ -55,5 +55,5 @@ REQUESTS= ["Twas brillig, and the slithy toves", "All mimsy were the borogroves,", "And the mome raths outgrabe."] -EventLoop(Client("localhost:5672", "examples", REQUESTS)).run() +Container(Client("localhost:5672", "examples", REQUESTS)).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/client_http.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/client_http.py b/examples/engine/py/client_http.py index ab7b1cd..5202f8d 100755 --- a/examples/engine/py/client_http.py +++ b/examples/engine/py/client_http.py @@ -36,9 +36,9 @@ class Client(MessagingHandler): self.receiver = None def on_start(self, event): - context = event.reactor.connect(self.host) - self.sender = context.create_sender(self.address) - self.receiver = context.create_receiver(None, dynamic=True) + conn = event.container.connect(self.host) + self.sender = event.container.create_sender(conn, self.address) + self.receiver = event.container.create_receiver(conn, None, dynamic=True) def on_link_opened(self, event): if event.receiver == self.receiver: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/db_recv.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/db_recv.py b/examples/engine/py/db_recv.py index 5779403..8b4490d 100755 --- a/examples/engine/py/db_recv.py +++ b/examples/engine/py/db_recv.py @@ -19,22 +19,20 @@ # from proton.handlers import MessagingHandler -from proton.reactors import ApplicationEvent, EventLoop +from proton.reactors import ApplicationEvent, Container from db_common import Db class Recv(MessagingHandler): - def __init__(self, host, address): + def __init__(self, url): super(Recv, self).__init__(auto_accept=False) - self.host = host - self.address = address + self.url = url self.delay = 0 # TODO: load last tag from db self.last_id = None def on_start(self, event): - self.db = Db("dst_db", event.reactor.get_event_trigger()) - context = event.reactor.connect(self.host) - context.create_receiver(self.address) + self.db = Db("dst_db", event.container.get_event_trigger()) + event.container.create_receiver(self.url) def on_record_inserted(self, event): self.accept(event.delivery) @@ -49,7 +47,7 @@ class Recv(MessagingHandler): self.accept(event.delivery) try: - EventLoop(Recv("localhost:5672", "examples")).run() + Container(Recv("localhost:5672/examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/db_send.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/db_send.py b/examples/engine/py/db_send.py index b3a26fd..bc8d6da 100755 --- a/examples/engine/py/db_send.py +++ b/examples/engine/py/db_send.py @@ -22,29 +22,27 @@ import Queue import time from proton import Message from proton.handlers import MessagingHandler -from proton.reactors import ApplicationEvent, EventLoop +from proton.reactors import ApplicationEvent, Container from db_common import Db class Send(MessagingHandler): - def __init__(self, host, address): + def __init__(self, url): super(Send, self).__init__() - self.host = host - self.address = address + self.url = url self.delay = 0 self.sent = 0 self.records = Queue.Queue(maxsize=50) def on_start(self, event): - self.eventloop = event.reactor - self.db = Db("src_db", event.reactor.get_event_trigger()) - context = event.reactor.connect(self.host) - self.sender = context.create_sender(self.address) + self.container = event.container + self.db = Db("src_db", self.container.get_event_trigger()) + self.sender = self.container.create_sender(self.url) def on_records_loaded(self, event): if self.records.empty() and event.subject == self.sent: print "Exhausted available data, waiting to recheck..." # check for new data after 5 seconds - self.eventloop.schedule(time.time() + 5, link=self.sender, subject="data") + self.container.schedule(time.time() + 5, link=self.sender, subject="data") else: self.send() @@ -78,6 +76,6 @@ class Send(MessagingHandler): self.request_records() try: - EventLoop(Send("localhost:5672", "examples")).run() + Container(Send("localhost:5672/examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/helloworld.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld.py b/examples/engine/py/helloworld.py index 5aa1482..92d6083 100755 --- a/examples/engine/py/helloworld.py +++ b/examples/engine/py/helloworld.py @@ -20,7 +20,7 @@ from proton import Message from proton.handlers import MessagingHandler -from proton.reactors import EventLoop +from proton.reactors import Container class HelloWorld(MessagingHandler): def __init__(self, server, address): @@ -29,9 +29,9 @@ class HelloWorld(MessagingHandler): self.address = address def on_start(self, event): - ctxt = event.reactor.connect(self.server) - ctxt.create_receiver(self.address) - ctxt.create_sender(self.address) + conn = event.container.connect(self.server) + event.container.create_receiver(conn, self.address) + event.container.create_sender(conn, self.address) def on_credit(self, event): event.sender.send_msg(Message(body=u"Hello World!")) @@ -41,5 +41,5 @@ class HelloWorld(MessagingHandler): print event.message.body event.connection.close() -EventLoop(HelloWorld("localhost:5672", "examples")).run() +Container(HelloWorld("localhost:5672", "examples")).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/helloworld_direct.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld_direct.py b/examples/engine/py/helloworld_direct.py index 35ac597..c961fe5 100755 --- a/examples/engine/py/helloworld_direct.py +++ b/examples/engine/py/helloworld_direct.py @@ -20,18 +20,16 @@ from proton import Message from proton.handlers import MessagingHandler -from proton.reactors import EventLoop +from proton.reactors import Container class HelloWorld(MessagingHandler): - def __init__(self, server, address): + def __init__(self, url): super(HelloWorld, self).__init__() - self.server = server - self.address = address + self.url = url def on_start(self, event): - self.acceptor = event.reactor.listen(self.server) - ctxt = event.reactor.connect(self.server) - ctxt.create_sender(self.address) + self.acceptor = event.container.listen(self.url) + event.container.create_sender(self.url) def on_credit(self, event): event.sender.send_msg(Message(body=u"Hello World!")) @@ -46,4 +44,4 @@ class HelloWorld(MessagingHandler): def on_connection_closed(self, event): self.acceptor.close() -EventLoop(HelloWorld("localhost:8888", "examples")).run() +Container(HelloWorld("localhost:8888/examples")).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/helloworld_direct_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld_direct_tornado.py b/examples/engine/py/helloworld_direct_tornado.py index 45926c6..8873357 100755 --- a/examples/engine/py/helloworld_direct_tornado.py +++ b/examples/engine/py/helloworld_direct_tornado.py @@ -29,10 +29,10 @@ class HelloWorld(MessagingHandler): self.address = address def on_start(self, event): - self.eventloop = event.reactor - self.acceptor = event.reactor.listen(self.server) - ctxt = event.reactor.connect(self.server) - ctxt.create_sender(self.address) + self.eventloop = event.container + self.acceptor = event.container.listen(self.server) + conn = event.container.connect(self.server) + event.container.create_sender(conn, self.address) def on_credit(self, event): event.sender.send_msg(Message(body=u"Hello World!")) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/helloworld_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/helloworld_tornado.py b/examples/engine/py/helloworld_tornado.py index 6a82b69..f7d4c26 100755 --- a/examples/engine/py/helloworld_tornado.py +++ b/examples/engine/py/helloworld_tornado.py @@ -29,10 +29,10 @@ class HelloWorld(MessagingHandler): self.address = address def on_start(self, event): - self.eventloop = event.reactor - ctxt = event.reactor.connect(self.server) - ctxt.create_receiver(self.address) - ctxt.create_sender(self.address) + self.eventloop = event.container + conn = event.container.connect(self.server) + event.container.create_receiver(conn, self.address) + event.container.create_sender(conn, self.address) def on_credit(self, event): event.sender.send_msg(Message(body=u"Hello World!")) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/proton_server.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/proton_server.py b/examples/engine/py/proton_server.py index b2e2027..8a5077b 100644 --- a/examples/engine/py/proton_server.py +++ b/examples/engine/py/proton_server.py @@ -18,15 +18,15 @@ # from proton import Message -from proton.reactors import EventLoop -from proton.handlers import FlowController, IncomingMessageHandler +from proton.reactors import Container +from proton.handlers import MessagingHandler -class Server(IncomingMessageHandler): +class Server(MessagingHandler): def __init__(self, host, address): super(Server, self).__init__() - self.eventloop = EventLoop(self, FlowController(10)) - self.conn = self.eventloop.connect(host) - self.receiver = self.conn.create_receiver(address) + self.container = Container(self) + self.conn = self.container.connect(host) + self.receiver = self.container.create_receiver(self.conn, address) self.senders = {} self.relay = None @@ -35,21 +35,21 @@ class Server(IncomingMessageHandler): def on_connection_open(self, event): if event.connection.remote_offered_capabilities and "ANONYMOUS-RELAY" in event.connection.remote_offered_capabilities: - self.relay = self.conn.create_sender(None) + self.relay = self.container.create_sender(self.conn, None) def on_connection_close(self, endpoint, error): if error: print "Closed due to %s" % error self.conn.close() def run(self): - self.eventloop.run() + self.container.run() def send(self, response, reply_to): sender = self.relay if not sender: sender = self.senders.get(reply_to) if not sender: - sender = self.conn.create_sender(reply_to) + sender = self.container.create_sender(self.conn, reply_to) self.senders[reply_to] = sender msg = Message(body=response) if self.relay: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/proton_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/proton_tornado.py b/examples/engine/py/proton_tornado.py index e49b28e..cfe7d6f 100644 --- a/examples/engine/py/proton_tornado.py +++ b/examples/engine/py/proton_tornado.py @@ -18,10 +18,10 @@ # under the License. # -from proton.reactors import ApplicationEvent, EventLoop, StartEvent +from proton.reactors import ApplicationEvent, Container, StartEvent import tornado.ioloop -class TornadoLoop(EventLoop): +class TornadoLoop(Container): def __init__(self, *handlers): super(TornadoLoop, self).__init__(*handlers) self.loop = tornado.ioloop.IOLoop.current() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/recurring_timer.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/recurring_timer.py b/examples/engine/py/recurring_timer.py index c641ec6..de530d3 100755 --- a/examples/engine/py/recurring_timer.py +++ b/examples/engine/py/recurring_timer.py @@ -19,29 +19,25 @@ # import time -from proton.reactors import EventLoop, Handler +from proton.reactors import Container, Handler class Recurring(Handler): def __init__(self, period): - self.eventloop = EventLoop(self) self.period = period - self.eventloop.schedule(time.time() + self.period, subject=self) + + def on_start(self, event): + self.container = event.container + self.container.schedule(time.time() + self.period, subject=self) def on_timer(self, event): print "Tick..." - self.eventloop.schedule(time.time() + self.period, subject=self) - - def run(self): - self.eventloop.run() - - def stop(self): - self.eventloop.stop() + self.container.schedule(time.time() + self.period, subject=self) try: - app = Recurring(1.0) - app.run() + container = Container(Recurring(1.0)) + container.run() except KeyboardInterrupt: - app.stop() + container.stop() print http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/recurring_timer_tornado.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/recurring_timer_tornado.py b/examples/engine/py/recurring_timer_tornado.py index f4ca260..aeeb20c 100755 --- a/examples/engine/py/recurring_timer_tornado.py +++ b/examples/engine/py/recurring_timer_tornado.py @@ -19,30 +19,26 @@ # import time -from proton import Handler +from proton.reactors import Handler from proton_tornado import TornadoLoop class Recurring(Handler): def __init__(self, period): - self.eventloop = TornadoLoop(self) self.period = period - self.eventloop.schedule(time.time() + self.period, subject=self) + + def on_start(self, event): + self.container = event.container + self.container.schedule(time.time() + self.period, subject=self) def on_timer(self, event): print "Tick..." - self.eventloop.schedule(time.time() + self.period, subject=self) - - def run(self): - self.eventloop.run() - - def stop(self): - self.eventloop.stop() + self.container.schedule(time.time() + self.period, subject=self) try: - app = Recurring(1.0) - app.run() + container = TornadoLoop(Recurring(1.0)) + container.run() except KeyboardInterrupt: - app.stop() + container.stop() print http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/selected_recv.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/selected_recv.py b/examples/engine/py/selected_recv.py index 8425f3d..d0df3b5 100755 --- a/examples/engine/py/selected_recv.py +++ b/examples/engine/py/selected_recv.py @@ -18,7 +18,7 @@ # under the License. # -from proton.reactors import EventLoop, Selector +from proton.reactors import Container, Selector from proton.handlers import MessagingHandler class Recv(MessagingHandler): @@ -26,14 +26,14 @@ class Recv(MessagingHandler): super(Recv, self).__init__() def on_start(self, event): - conn = event.reactor.connect("localhost:5672") - conn.create_receiver("examples", options=Selector(u"colour = 'green'")) + conn = event.container.connect("localhost:5672") + event.container.create_receiver(conn, "examples", options=Selector(u"colour = 'green'")) def on_message(self, event): print event.message.body try: - EventLoop(Recv()).run() + Container(Recv()).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/server.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/server.py b/examples/engine/py/server.py index 6ab5671..3e6aad4 100755 --- a/examples/engine/py/server.py +++ b/examples/engine/py/server.py @@ -20,7 +20,7 @@ from proton import Message from proton.handlers import MessagingHandler -from proton.reactors import EventLoop +from proton.reactors import Container class Server(MessagingHandler): def __init__(self, host, address): @@ -29,26 +29,27 @@ class Server(MessagingHandler): self.address = address def on_start(self, event): - self.conn = event.reactor.connect(self.host) - self.receiver = self.conn.create_receiver(self.address) + self.container = event.container + self.conn = event.container.connect(self.host) + self.receiver = event.container.create_receiver(self.conn, self.address) self.senders = {} self.relay = None def on_connection_opened(self, event): if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: - self.relay = self.conn.create_sender(None) + self.relay = self.container.create_sender(self.conn, None) def on_message(self, event): sender = self.relay if not sender: sender = self.senders.get(event.message.reply_to) if not sender: - sender = self.conn.create_sender(event.message.reply_to) + sender = self.container.create_sender(self.conn, event.message.reply_to) self.senders[event.message.reply_to] = sender sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper())) try: - EventLoop(Server("localhost:5672", "examples")).run() + Container(Server("localhost:5672", "examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/server_tx.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/server_tx.py b/examples/engine/py/server_tx.py index cda2d0b..0305a3f 100755 --- a/examples/engine/py/server_tx.py +++ b/examples/engine/py/server_tx.py @@ -19,16 +19,15 @@ # from proton import Message -from proton.reactors import EventLoop +from proton.reactors import Container from proton.handlers import MessagingHandler, TransactionHandler class TxRequest(TransactionHandler): - def __init__(self, response, sender, request_delivery, context): + def __init__(self, response, sender, request_delivery): super(TxRequest, self).__init__() self.response = response self.sender = sender self.request_delivery = request_delivery - self.context = context def on_transaction_declared(self, event): self.sender.send_msg(self.response, transaction=event.transaction) @@ -49,8 +48,9 @@ class TxServer(MessagingHandler): self.address = address def on_start(self, event): - self.context = event.reactor.connect(self.host, reconnect=False) - self.receiver = self.context.create_receiver(self.address) + self.container = event.container + self.conn = event.container.connect(self.host, reconnect=False) + self.receiver = event.container.create_receiver(self.conn, self.address) self.senders = {} self.relay = None @@ -59,18 +59,18 @@ class TxServer(MessagingHandler): if not sender: sender = self.senders.get(event.message.reply_to) if not sender: - sender = self.context.create_sender(event.message.reply_to) + sender = self.container.create_sender(self.conn, event.message.reply_to) self.senders[event.message.reply_to] = sender response = Message(address=event.message.reply_to, body=event.message.body.upper()) - self.context.declare_transaction(handler=TxRequest(response, sender, event.delivery, self.context)) + self.container.declare_transaction(self.conn, handler=TxRequest(response, sender, event.delivery)) def on_connection_open(self, event): if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: - self.relay = self.context.create_sender(None) + self.relay = self.container.create_sender(self.conn, None) try: - EventLoop(TxServer("localhost:5672", "examples")).run() + Container(TxServer("localhost:5672", "examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/simple_recv.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/simple_recv.py b/examples/engine/py/simple_recv.py index ea80aa6..6825c86 100755 --- a/examples/engine/py/simple_recv.py +++ b/examples/engine/py/simple_recv.py @@ -19,23 +19,21 @@ # from proton.handlers import MessagingHandler -from proton.reactors import EventLoop +from proton.reactors import Container class Recv(MessagingHandler): - def __init__(self, host, address): + def __init__(self, url): super(Recv, self).__init__() - self.host = host - self.address = address + self.url = url def on_start(self, event): - conn = event.reactor.connect(self.host) - conn.create_receiver(self.address) + event.container.create_receiver(self.url) def on_message(self, event): print event.message.body try: - EventLoop(Recv("localhost:5672", "examples")).run() + Container(Recv("localhost:5672/examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/simple_send.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/simple_send.py b/examples/engine/py/simple_send.py index bbd30ac..21530ef 100755 --- a/examples/engine/py/simple_send.py +++ b/examples/engine/py/simple_send.py @@ -20,20 +20,18 @@ from proton import Message from proton.handlers import MessagingHandler -from proton.reactors import EventLoop +from proton.reactors import Container class Send(MessagingHandler): - def __init__(self, host, address, messages): + def __init__(self, url, messages): super(Send, self).__init__() - self.host = host - self.address = address + self.url = url self.sent = 0 self.confirmed = 0 self.total = messages def on_start(self, event): - conn = event.reactor.connect(self.host) - conn.create_sender(self.address) + event.container.create_sender(self.url) def on_credit(self, event): while event.sender.credit and self.sent < self.total: @@ -51,5 +49,5 @@ class Send(MessagingHandler): self.sent = self.confirmed try: - EventLoop(Send("localhost:5672", "examples", 10000)).run() + Container(Send("localhost:5672/examples", 10000)).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/tx_recv.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/tx_recv.py b/examples/engine/py/tx_recv.py index a28a3df..fc4bb8a 100755 --- a/examples/engine/py/tx_recv.py +++ b/examples/engine/py/tx_recv.py @@ -18,7 +18,7 @@ # under the License. # -from proton.reactors import EventLoop +from proton.reactors import Container from proton.handlers import TransactionalClientHandler class TxRecv(TransactionalClientHandler): @@ -26,10 +26,12 @@ class TxRecv(TransactionalClientHandler): super(TxRecv, self).__init__(prefetch=0) self.current_batch = 0 self.batch_size = batch_size - self.event_loop = EventLoop(self) - self.conn = self.event_loop.connect("localhost:5672") - self.receiver = self.conn.create_receiver("examples") - self.conn.declare_transaction(handler=self) + + def on_start(self, event): + self.container = event.container + self.conn = self.container.connect("localhost:5672") + self.receiver = self.container.create_receiver(self.conn, "examples") + self.container.declare_transaction(self.conn, handler=self) self.transaction = None def on_message(self, event): @@ -46,16 +48,13 @@ class TxRecv(TransactionalClientHandler): def on_transaction_committed(self, event): self.current_batch = 0 - self.conn.declare_transaction(handler=self) + self.container.declare_transaction(self.conn, handler=self) def on_disconnected(self, event): self.current_batch = 0 - def run(self): - self.event_loop.run() - try: - TxRecv(10).run() + Container(TxRecv(10)).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/tx_recv_interactive.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/tx_recv_interactive.py b/examples/engine/py/tx_recv_interactive.py index a822992..6eb320e 100755 --- a/examples/engine/py/tx_recv_interactive.py +++ b/examples/engine/py/tx_recv_interactive.py @@ -20,7 +20,7 @@ import sys import threading -from proton.reactors import ApplicationEvent, EventLoop +from proton.reactors import ApplicationEvent, Container from proton.handlers import TransactionalClientHandler class TxRecv(TransactionalClientHandler): @@ -28,10 +28,10 @@ class TxRecv(TransactionalClientHandler): super(TxRecv, self).__init__(prefetch=0) def on_start(self, event): - self.context = event.reactor.connect("localhost:5672") - self.receiver = self.context.create_receiver("examples") - #self.context.declare_transaction(handler=self, settle_before_discharge=False) - self.context.declare_transaction(handler=self, settle_before_discharge=True) + self.container = event.container + self.conn = self.container.connect("localhost:5672") + self.receiver = self.container.create_receiver(self.conn, "examples") + self.container.declare_transaction(self.conn, handler=self, settle_before_discharge=True) self.transaction = None def on_message(self, event): @@ -44,11 +44,11 @@ class TxRecv(TransactionalClientHandler): def on_transaction_committed(self, event): print "transaction committed" - self.context.declare_transaction(handler=self) + self.container.declare_transaction(self.conn, handler=self) def on_transaction_aborted(self, event): print "transaction aborted" - self.context.declare_transaction(handler=self) + self.container.declare_transaction(self.conn, handler=self) def on_commit(self, event): self.transaction.commit() @@ -65,7 +65,7 @@ class TxRecv(TransactionalClientHandler): c.close() try: - reactor = EventLoop(TxRecv()) + reactor = Container(TxRecv()) events = reactor.get_event_trigger() thread = threading.Thread(target=reactor.run) thread.daemon=True http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/tx_send.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/tx_send.py b/examples/engine/py/tx_send.py index b2f12b2..5b11280 100755 --- a/examples/engine/py/tx_send.py +++ b/examples/engine/py/tx_send.py @@ -19,7 +19,7 @@ # from proton import Message -from proton.reactors import EventLoop +from proton.reactors import Container from proton.handlers import TransactionalClientHandler class TxSend(TransactionalClientHandler): @@ -30,10 +30,12 @@ class TxSend(TransactionalClientHandler): self.confirmed = 0 self.total = messages self.batch_size = batch_size - self.eventloop = EventLoop() - self.conn = self.eventloop.connect("localhost:5672", handler=self) - self.sender = self.conn.create_sender("examples") - self.conn.declare_transaction(handler=self) + + def on_start(self, event): + self.container = event.container + self.conn = self.container.connect("localhost:5672", handler=self) + self.sender = self.container.create_sender(self.conn, "examples") + self.container.declare_transaction(self.conn, handler=self) self.transaction = None def on_transaction_declared(self, event): @@ -63,14 +65,11 @@ class TxSend(TransactionalClientHandler): event.connection.close() else: self.current_batch = 0 - self.conn.declare_transaction(handler=self) + self.container.declare_transaction(self.conn, handler=self) def on_disconnected(self, event): self.current_batch = 0 - def run(self): - self.eventloop.run() - try: - TxSend(10000, 10).run() + Container(TxSend(10000, 10)).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/examples/engine/py/tx_send_sync.py ---------------------------------------------------------------------- diff --git a/examples/engine/py/tx_send_sync.py b/examples/engine/py/tx_send_sync.py index 0c50838..c051408 100755 --- a/examples/engine/py/tx_send_sync.py +++ b/examples/engine/py/tx_send_sync.py @@ -19,21 +19,23 @@ # from proton import Message -from proton.reactors import EventLoop +from proton.reactors import Container from proton.handlers import TransactionalClientHandler class TxSend(TransactionalClientHandler): def __init__(self, messages, batch_size): super(TxSend, self).__init__() self.current_batch = 0 - self.confirmed = 0 self.committed = 0 + self.confirmed = 0 self.total = messages self.batch_size = batch_size - self.eventloop = EventLoop() - self.conn = self.eventloop.connect("localhost:5672", handler=self) - self.sender = self.conn.create_sender("examples") - self.conn.declare_transaction(handler=self) + + def on_start(self, event): + self.container = event.container + self.conn = self.container.connect("localhost:5672", handler=self) + self.sender = self.container.create_sender(self.conn, "examples") + self.container.declare_transaction(self.conn, handler=self) self.transaction = None def on_transaction_declared(self, event): @@ -64,14 +66,11 @@ class TxSend(TransactionalClientHandler): event.connection.close() else: self.current_batch = 0 - self.conn.declare_transaction(handler=self) + self.container.declare_transaction(self.conn, handler=self) def on_disconnected(self, event): self.current_batch = 0 - def run(self): - self.eventloop.run() - try: - TxSend(10000, 10).run() + Container(TxSend(10000, 10)).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/proton-c/bindings/python/proton/reactors.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactors.py b/proton-c/bindings/python/proton/reactors.py index fc10860..16a87e4 100644 --- a/proton-c/bindings/python/proton/reactors.py +++ b/proton-c/bindings/python/proton/reactors.py @@ -18,7 +18,7 @@ # import heapq, os, Queue, socket, time, types from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url -from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout +from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Session, Terminus, Timeout from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException from select import select from proton.handlers import nested_handlers, ScopedHandler @@ -310,9 +310,9 @@ class ApplicationEvent(Event): ", ".join([str(o) for o in objects if o is not None])) class StartEvent(ApplicationEvent): - def __init__(self, reactor): + def __init__(self, container): super(StartEvent, self).__init__("start") - self.reactor = reactor + self.container = container class ScheduledEvents(Events): """ @@ -576,100 +576,38 @@ def _apply_link_options(options, link): else: if options.test(link): options.apply(link) +def _create_session(connection, handler=None): + session = connection.session() + session.open() + return session -class MessagingContext(object): - """ - A context for creating links. This allows the user to ignore - sessions unless they explicitly want to control them. Additionally - provides support for transactional messaging. - """ - def __init__(self, conn, handler=None, ssn=None): - self.conn = conn - if handler: - self.conn.context = handler - self.conn._mc = self - self.ssn = ssn - self.txn_ctrl = None - - def _get_handler(self): - return self.conn.context - def _set_handler(self, value): - self.conn.context = value - - handler = property(_get_handler, _set_handler) - - def create_sender(self, target, source=None, name=None, handler=None, tags=None, options=None): - snd = self._get_ssn().sender(name or self._get_id(target, source)) - if source: - snd.source.address = source - if target: - snd.target.address = target - if handler: - snd.context = handler - snd.tags = tags or delivery_tags() - snd.send_msg = types.MethodType(_send_msg, snd) - _apply_link_options(options, snd) - snd.open() - return snd - - def create_receiver(self, source, target=None, name=None, dynamic=False, handler=None, options=None): - rcv = self._get_ssn().receiver(name or self._get_id(source, target)) - if source: - rcv.source.address = source - if dynamic: - rcv.source.dynamic = True - if target: - rcv.target.address = target - if handler: - rcv.context = handler - _apply_link_options(options, rcv) - rcv.open() - return rcv - - def create_session(self): - return MessageContext(conn=None, ssn=self._new_ssn()) +def _get_attr(target, name): + if hasattr(target, name): + return getattr(target, name) + else: + return None - def declare_transaction(self, handler=None, settle_before_discharge=False): - if not self.txn_ctrl: - self.txn_ctrl = self.create_sender(None, name="txn-ctrl") - self.txn_ctrl.target.type = Terminus.COORDINATOR - self.txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) - return Transaction(self.txn_ctrl, handler, settle_before_discharge) +class SessionPerConnection(object): + def __init__(self): + self._default_session = None - def close(self): - if self.ssn: - self.ssn.close() - if self.conn: - self.conn.close() - - def _get_id(self, remote, local): - if local and remote: "%s-%s-%s" % (self.conn.container, remote, local) - elif local: return "%s-%s" % (self.conn.container, local) - elif remote: return "%s-%s" % (self.conn.container, remote) - else: return "%s-%s" % (self.conn.container, str(generate_uuid())) - - def _get_ssn(self): - if not self.ssn: - self.ssn = self._new_ssn() - self.ssn.context = self - return self.ssn - - def _new_ssn(self): - ssn = self.conn.session() - ssn.open() - return ssn + def session(self, connection): + if not self._default_session: + self._default_session = _create_session(connection) + self._default_session.context = self + return self._default_session def on_session_remote_close(self, event): - if self.conn: - self.conn.close() + event.connection.close() + self._default_session = None class Connector(Handler): """ Internal handler that triggers the necessary socket connect for an opened connection. """ - def attach_to(self, loop): + def __init__(self, loop): self.loop = loop def _connect(self, connection): @@ -677,6 +615,7 @@ class Connector(Handler): #print "connecting to %s:%i" % (host, port) heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat') else None self.loop.add(AmqpSocket(connection, socket.socket(), self.loop.events, heartbeat=heartbeat).connect(host, port)) + connection._pin = None #connection is now referenced by AmqpSocket, so no need for circular reference def on_connection_local_open(self, event): if hasattr(event.connection, "address"): @@ -688,6 +627,7 @@ class Connector(Handler): def on_disconnected(self, event): if hasattr(event.connection, "reconnect"): + event.connection._pin = event.connection #no longer referenced by AmqpSocket, so pin in memory with circular reference delay = event.connection.reconnect.next() if delay == 0: print "Disconnected, reconnecting..." @@ -739,31 +679,97 @@ class Urls(object): self.i = iter(self.values) return self._as_pair(self.i.next()) -class EventLoop(object): +class Container(object): def __init__(self, *handlers): - self.connector = Connector() - h = [self.connector, ScopedHandler()] + h = [Connector(self), ScopedHandler()] h.extend(nested_handlers(handlers)) self.events = ScheduledEvents(*h) self.loop = SelectLoop(self.events) - self.connector.attach_to(self) self.trigger = None self.container_id = str(generate_uuid()) def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None): - context = MessagingContext(self.events.connection(), handler=handler) - context.conn.container = self.container_id or str(generate_uuid()) - context.conn.heartbeat = heartbeat - if url: context.conn.address = Urls([url]) - elif urls: context.conn.address = Urls(urls) - elif address: context.conn.address = address + conn = self.events.connection() + conn._pin = conn #circular reference until the open event gets handled + if handler: + conn.context = handler + conn.container = self.container_id or str(generate_uuid()) + conn.heartbeat = heartbeat + if url: conn.address = Urls([url]) + elif urls: conn.address = Urls(urls) + elif address: conn.address = address else: raise ValueError("One of url, urls or address required") if reconnect: - context.conn.reconnect = reconnect + conn.reconnect = reconnect elif reconnect is None: - context.conn.reconnect = Backoff() - context.conn.open() - return context + conn.reconnect = Backoff() + conn._session_policy = SessionPerConnection() #todo: make configurable + conn.open() + return conn + + def _get_id(self, container, remote, local): + if local and remote: "%s-%s-%s" % (container, remote, local) + elif local: return "%s-%s" % (container, local) + elif remote: return "%s-%s" % (container, remote) + else: return "%s-%s" % (container, str(generate_uuid())) + + def _get_session(self, context): + if isinstance(context, Url): + return self._get_session(self.connect(url=context)) + elif isinstance(context, Session): + return context + elif isinstance(context, Connection): + if hasattr(context, '_session_policy'): + return context._session_policy.session(context) + else: + return _create_session(context) + else: + return context.session() + + def create_sender(self, context, target=None, source=None, name=None, handler=None, tags=None, options=None): + if isinstance(context, basestring): + context = Url(context) + if isinstance(context, Url) and not target: + target = context.path + session = self._get_session(context) + snd = session.sender(name or self._get_id(session.connection.container, target, source)) + if source: + snd.source.address = source + if target: + snd.target.address = target + if handler: + snd.context = handler + snd.tags = tags or delivery_tags() + snd.send_msg = types.MethodType(_send_msg, snd) + _apply_link_options(options, snd) + snd.open() + return snd + + def create_receiver(self, context, source=None, target=None, name=None, dynamic=False, handler=None, options=None): + if isinstance(context, basestring): + context = Url(context) + if isinstance(context, Url) and not source: + source = context.path + session = self._get_session(context) + rcv = session.receiver(name or self._get_id(session.connection.container, source, target)) + if source: + rcv.source.address = source + if dynamic: + rcv.source.dynamic = True + if target: + rcv.target.address = target + if handler: + rcv.context = handler + _apply_link_options(options, rcv) + rcv.open() + return rcv + + def declare_transaction(self, context, handler=None, settle_before_discharge=False): + if not _get_attr(context, '_txn_ctrl'): + context._txn_ctrl = self.create_sender(context, None, name='txn-ctrl') + context._txn_ctrl.target.type = Terminus.COORDINATOR + context._txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) + return Transaction(context._txn_ctrl, handler, settle_before_discharge) def listen(self, url): host, port = Urls([url]).next() @@ -794,15 +800,3 @@ class EventLoop(object): def do_work(self, timeout=None): return self.loop.do_work(timeout) -EventLoop.DEFAULT = EventLoop() - -def connect(url=None, urls=None, address=None, handler=None, reconnect=None, eventloop=None, heartbeat=None): - if not eventloop: - eventloop = EventLoop.DEFAULT - return eventloop.connect(url=url, urls=urls, address=address, handler=handler, reconnect=reconnect, heartbeat=heartbeat) - -def run(eventloop=None): - if not eventloop: - eventloop = EventLoop.DEFAULT - eventloop.run() - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9c87b3db/proton-c/bindings/python/proton/utils.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py index 845c3ab..03c9417 100644 --- a/proton-c/bindings/python/proton/utils.py +++ b/proton-c/bindings/python/proton/utils.py @@ -17,8 +17,8 @@ # under the License. # import Queue, socket, time -from proton import ConnectionException, Endpoint, Handler, Message, Url -from proton.reactors import AmqpSocket, Events, MessagingContext, SelectLoop, send_msg +from proton import ConnectionException, Endpoint, Handler, Message, Timeout, Url +from proton.reactors import AmqpSocket, Container, Events, SelectLoop, send_msg from proton.handlers import ScopedHandler class BlockingLink(object): @@ -52,36 +52,32 @@ class BlockingConnection(Handler): """ A synchronous style connection wrapper. """ - def __init__(self, url, timeout=None): + def __init__(self, url, timeout=None, container=None): self.timeout = timeout - self.events = Events(ScopedHandler()) - self.loop = SelectLoop(self.events) - self.context = MessagingContext(self.loop.events.connection(), handler=self) + self.container = container or Container() if isinstance(url, basestring): self.url = Url(url) else: self.url = url - self.loop.add( - AmqpSocket(self.context.conn, socket.socket(), self.events).connect(self.url.host, self.url.port)) - self.context.conn.open() - self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_UNINIT), + self.conn = self.container.connect(url=self.url, handler=self) + self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), msg="Opening connection") def create_sender(self, address, handler=None): - return BlockingSender(self, self.context.create_sender(address, handler=handler)) + return BlockingSender(self, self.container.create_sender(self.conn, address, handler=handler)) def create_receiver(self, address, credit=1, dynamic=False, handler=None): return BlockingReceiver( - self, self.context.create_receiver(address, dynamic=dynamic, handler=handler), credit=credit) + self, self.container.create_receiver(self.conn, address, dynamic=dynamic, handler=handler), credit=credit) def close(self): - self.context.conn.close() - self.wait(lambda: not (self.context.conn.state & Endpoint.REMOTE_ACTIVE), + self.conn.close() + self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), msg="Closing connection") def run(self): """ Hand control over to the event loop (e.g. if waiting indefinitely for incoming messages) """ - self.loop.run() + self.container.run() def wait(self, condition, timeout=False, msg=None): """Call do_work until condition() is true""" @@ -89,11 +85,11 @@ class BlockingConnection(Handler): timeout = self.timeout if timeout is None: while not condition(): - self.loop.do_work() + self.container.do_work() else: deadline = time.time() + timeout while not condition(): - if not self.loop.do_work(deadline - time.time()): + if not self.container.do_work(deadline - time.time()): txt = "Connection %s timed out" % self.url if msg: txt += ": " + msg raise Timeout(txt) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org