Repository: qpid-proton Updated Branches: refs/heads/examples 020bf1694 -> 6aa4b4c8d
Some fixes and refactorings Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6aa4b4c8 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6aa4b4c8 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6aa4b4c8 Branch: refs/heads/examples Commit: 6aa4b4c8d108f2027bd0d9bd3247a616ae59366b Parents: 020bf16 Author: Gordon Sim <[email protected]> Authored: Wed Nov 19 17:23:09 2014 +0000 Committer: Gordon Sim <[email protected]> Committed: Wed Nov 19 17:23:09 2014 +0000 ---------------------------------------------------------------------- proton-c/bindings/python/proton.py | 2 +- tutorial/client.py | 33 +-- tutorial/client_http.py | 68 ++++-- tutorial/db_recv.py | 19 +- tutorial/db_send.py | 22 +- tutorial/helloworld.py | 17 +- tutorial/helloworld_alt.py | 48 ---- tutorial/helloworld_direct.py | 33 ++- tutorial/helloworld_direct_alt.py | 55 ----- tutorial/helloworld_direct_tornado.py | 34 ++- tutorial/helloworld_simple.py | 5 +- tutorial/helloworld_simplistic.py | 4 +- tutorial/helloworld_tornado.py | 23 +- tutorial/proton_events.py | 339 ++++++++++++++++++++--------- tutorial/proton_tornado.py | 13 +- tutorial/server.py | 29 +-- tutorial/server_tx.py | 35 ++- tutorial/simple_recv.py | 17 +- tutorial/simple_send.py | 21 +- tutorial/tx_recv.py | 3 +- tutorial/tx_send.py | 5 +- tutorial/tx_send_sync.py | 1 + 22 files changed, 446 insertions(+), 380 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/proton-c/bindings/python/proton.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton.py b/proton-c/bindings/python/proton.py index 7d98929..a8f0db6 100644 --- a/proton-c/bindings/python/proton.py +++ b/proton-c/bindings/python/proton.py @@ -3404,7 +3404,7 @@ def dispatch(handler, method, *args): elif hasattr(handler, "on_unhandled"): return handler.on_unhandled(method, args) -class Event: +class Event(object): CONNECTION_INIT = EventType(PN_CONNECTION_INIT, "on_connection_init") CONNECTION_BOUND = EventType(PN_CONNECTION_BOUND, "on_connection_bound") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/client.py ---------------------------------------------------------------------- diff --git a/tutorial/client.py b/tutorial/client.py index 578d06a..e3b705c 100755 --- a/tutorial/client.py +++ b/tutorial/client.py @@ -19,22 +19,28 @@ # from proton import Message -from proton_events import EventLoop, ClientHandler - -class Client(ClientHandler): - def __init__(self, eventloop, host, address, requests): - self.eventloop = eventloop - self.conn = eventloop.connect(host) - self.sender = self.conn.create_sender(address) - self.receiver = self.conn.create_receiver(None, dynamic=True, handler=self) +from proton_events import EventLoop, MessagingHandler + +class Client(MessagingHandler): + def __init__(self, host, address, requests): + super(Client, self).__init__() + self.host = host + self.address = address self.requests = requests + def on_start(self, event): + self.conn = event.reactor.connect(self.host) + self.sender = self.conn.create_sender(self.address) + self.receiver = self.conn.create_receiver(None, dynamic=True) + def next_request(self): - req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0]) - self.sender.send_msg(req) + if self.receiver.remote_source.address: + req = Message(reply_to=self.receiver.remote_source.address, body=self.requests[0]) + self.sender.send_msg(req) def on_link_opened(self, event): - self.next_request() + if event.receiver == self.receiver: + self.next_request() def on_message(self, event): print "%s => %s" % (self.requests.pop(0), event.message.body) @@ -43,13 +49,10 @@ class Client(ClientHandler): else: self.conn.close() - def run(self): - self.eventloop.run() - REQUESTS= ["Twas brillig, and the slithy toves", "Did gire and gymble in the wabe.", "All mimsy were the borogroves,", "And the mome raths outgrabe."] -Client(EventLoop(), "localhost:5672", "examples", REQUESTS).run() +EventLoop(Client("localhost:5672", "examples", REQUESTS)).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/client_http.py ---------------------------------------------------------------------- diff --git a/tutorial/client_http.py b/tutorial/client_http.py index f78cf39..afa0c78 100755 --- a/tutorial/client_http.py +++ b/tutorial/client_http.py @@ -19,14 +19,56 @@ # from proton import Message -from proton_events import ClientHandler +from proton_events import MessagingHandler from proton_tornado import TornadoLoop from tornado.ioloop import IOLoop import tornado.web -class ExampleHandler(tornado.web.RequestHandler, ClientHandler): - def initialize(self, loop): - self.loop = loop +class Client(MessagingHandler): + def __init__(self, host, address): + super(Client, self).__init__() + self.host = host + self.address = address + self.sent = [] + self.pending = [] + self.reply_address = None + self.sender = None + self.receiver = None + + def on_start(self, event): + context = event.reactor.connect(self.host) + self.sender = context.create_sender(self.address) + self.receiver = context.create_receiver(None, dynamic=True) + + def on_link_opened(self, event): + if event.receiver == self.receiver: + self.reply_address = event.link.remote_source.address + self.do_request() + + def on_credit(self, event): + self.do_request() + + def on_message(self, event): + if self.sent: + request, handler = self.sent.pop(0) + print "%s => %s" % (request, event.message.body) + handler(event.message.body) + self.do_request() + + def do_request(self): + if self.pending and self.reply_address and self.sender.credit: + request, handler = self.pending.pop(0) + self.sent.append((request, handler)) + req = Message(reply_to=self.reply_address, body=request) + self.sender.send_msg(req) + + def request(self, body, handler): + self.pending.append((body, handler)) + self.do_request() + +class ExampleHandler(tornado.web.RequestHandler): + def initialize(self, client): + self.client = client def get(self): self._write_open() @@ -35,22 +77,15 @@ class ExampleHandler(tornado.web.RequestHandler, ClientHandler): @tornado.web.asynchronous def post(self): - self.conn = self.loop.connect("localhost:5672") - self.sender = self.conn.create_sender("examples") - self.conn.create_receiver(None, dynamic=True, handler=self) + client.request(self.get_body_argument("message"), lambda x: self.on_response(x)) - def on_link_opened(self, event): - req = Message(reply_to=event.link.remote_source.address, body=self.get_body_argument("message")) - self.sender.send_msg(req) - - def on_message(self, event): + def on_response(self, body): self.set_header("Content-Type", "text/html") self._write_open() self._write_form() - self.write("Response: " + event.message.body) + self.write("Response: " + body) self._write_close() self.finish() - self.conn.close() def _write_open(self): self.write('<html><body>') @@ -65,8 +100,9 @@ class ExampleHandler(tornado.web.RequestHandler, ClientHandler): '</form>') -loop = TornadoLoop() -app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, dict(loop=loop))]) +client = Client("localhost:5672", "examples") +loop = TornadoLoop(client) +app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, dict(client=client))]) app.listen(8888) try: loop.run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/db_recv.py ---------------------------------------------------------------------- diff --git a/tutorial/db_recv.py b/tutorial/db_recv.py index 9a3a881..1dfb3e3 100755 --- a/tutorial/db_recv.py +++ b/tutorial/db_recv.py @@ -18,22 +18,22 @@ # under the License. # -from proton_events import ApplicationEvent, ClientHandler, EventLoop +from proton_events import ApplicationEvent, MessagingHandler, EventLoop from db_common import Db -class Recv(ClientHandler): +class Recv(MessagingHandler): def __init__(self, host, address): - self.eventloop = EventLoop() + super(Recv, self).__init__(auto_accept=False) self.host = host self.address = address self.delay = 0 - self.db = Db("dst_db", self.eventloop.get_event_trigger()) # TODO: load last tag from db self.last_id = None - self.conn = self.eventloop.connect(self.host, handler=self) - self.conn.create_receiver(self.address) - def auto_accept(self): return False + def on_start(self, event): + self.db = Db("dst_db", event.reactor.get_event_trigger()) + context = event.reactor.connect(self.host) + context.create_receiver(self.address) def on_record_inserted(self, event): self.accept(event.delivery) @@ -47,11 +47,8 @@ class Recv(ClientHandler): else: self.accept(event.delivery) - def run(self): - self.eventloop.run() - try: - Recv("localhost:5672", "examples").run() + EventLoop(Recv("localhost:5672", "examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/db_send.py ---------------------------------------------------------------------- diff --git a/tutorial/db_send.py b/tutorial/db_send.py index e15db19..4701fae 100755 --- a/tutorial/db_send.py +++ b/tutorial/db_send.py @@ -21,20 +21,23 @@ import Queue import time from proton import Message -from proton_events import ApplicationEvent, ClientHandler, EventLoop +from proton_events import ApplicationEvent, MessagingHandler, EventLoop from db_common import Db -class Send(ClientHandler): +class Send(MessagingHandler): def __init__(self, host, address): - self.eventloop = EventLoop() - self.address = address + super(Send, self).__init__() self.host = host + self.address = address self.delay = 0 self.sent = 0 self.records = Queue.Queue(maxsize=50) - self.db = Db("src_db", self.eventloop.get_event_trigger()) - self.conn = self.eventloop.connect(self.host, handler=self) - self.sender = self.conn.create_sender(self.address) + + def on_start(self, event): + self.eventloop = event.reactor + self.db = Db("src_db", event.reactor.get_event_trigger()) + context = event.reactor.connect(self.host) + self.sender = context.create_sender(self.address) def on_records_loaded(self, event): if self.records.empty() and event.subject == self.sent: @@ -73,10 +76,7 @@ class Send(ClientHandler): print "Rechecking for data..." self.request_records() - def run(self): - self.eventloop.run() - try: - Send("localhost:5672", "examples").run() + EventLoop(Send("localhost:5672", "examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld.py b/tutorial/helloworld.py index 9dfc356..a3965a8 100755 --- a/tutorial/helloworld.py +++ b/tutorial/helloworld.py @@ -19,16 +19,18 @@ # from proton import Message -import proton_events +from proton_events import EventLoop, MessagingHandler -class HelloWorld(proton_events.ClientHandler): +class HelloWorld(MessagingHandler): def __init__(self, server, address): + super(HelloWorld, self).__init__() + self.server = server self.address = address - self.conn = proton_events.connect(server, handler=self) - def on_connection_opened(self, event): - self.conn.create_receiver(self.address) - self.conn.create_sender(self.address) + def on_start(self, event): + ctxt = event.reactor.connect(self.server) + ctxt.create_receiver(self.address) + ctxt.create_sender(self.address) def on_credit(self, event): event.sender.send_msg(Message(body=u"Hello World!")) @@ -38,6 +40,5 @@ class HelloWorld(proton_events.ClientHandler): print event.message.body event.connection.close() -HelloWorld("localhost:5672", "examples") -proton_events.run() +EventLoop(HelloWorld("localhost:5672", "examples")).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_alt.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_alt.py b/tutorial/helloworld_alt.py deleted file mode 100755 index 93898bb..0000000 --- a/tutorial/helloworld_alt.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton import Message -from proton_events import ClientEndpointHandler, EventLoop, IncomingMessageHandler, OutgoingMessageHandler - -class HelloWorldReceiver(IncomingMessageHandler): - def on_message(self, event): - print event.message.body - event.connection.close() - -class HelloWorldSender(OutgoingMessageHandler): - def on_credit(self, event): - event.link.send_msg(Message(body=u"Hello World!")) - event.link.close() - -class HelloWorld(ClientEndpointHandler): - def __init__(self, url, address): - self.eventloop = EventLoop() - self.conn = self.eventloop.connect(url, handler=self) - self.address = address - - def on_connection_opened(self, event): - self.conn.create_receiver(self.address, handler=HelloWorldReceiver()) - self.conn.create_sender(self.address, handler=HelloWorldSender()) - - def run(self): - self.eventloop.run() - -HelloWorld("localhost:5672", "examples").run() - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_direct.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_direct.py b/tutorial/helloworld_direct.py index d311cd1..fd70c0c 100755 --- a/tutorial/helloworld_direct.py +++ b/tutorial/helloworld_direct.py @@ -19,35 +19,30 @@ # from proton import Message -from proton_events import ClientHandler, EventLoop, FlowController, Handshaker, IncomingMessageHandler +from proton_events import EventLoop, MessagingHandler -class HelloWorldReceiver(IncomingMessageHandler): - def on_message(self, event): - print event.message.body - event.connection.close() - -class HelloWorld(ClientHandler): - def __init__(self, eventloop, url, address): - self.eventloop = eventloop - self.acceptor = eventloop.listen(url) - self.conn = eventloop.connect(url, handler=self) +class HelloWorld(MessagingHandler): + def __init__(self, server, address): + super(HelloWorld, self).__init__() + self.server = server self.address = address - def on_connection_opened(self, event): - self.conn.create_sender(self.address) + def on_start(self, event): + self.acceptor = event.reactor.listen(self.server) + ctxt = event.reactor.connect(self.server) + ctxt.create_sender(self.address) def on_credit(self, event): event.sender.send_msg(Message(body=u"Hello World!")) event.sender.close() + def on_message(self, event): + print event.message.body + def on_accepted(self, event): - self.conn.close() + event.connection.close() def on_connection_closed(self, event): self.acceptor.close() - def run(self): - self.eventloop.run() - -eventloop = EventLoop(HelloWorldReceiver(), Handshaker(), FlowController(1)) -HelloWorld(eventloop, "localhost:8888", "examples").run() +EventLoop(HelloWorld("localhost:8888", "examples")).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_direct_alt.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_direct_alt.py b/tutorial/helloworld_direct_alt.py deleted file mode 100755 index b5fa381..0000000 --- a/tutorial/helloworld_direct_alt.py +++ /dev/null @@ -1,55 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from proton import Message -from proton_events import ClientEndpointHandler, EventLoop, FlowController, Handshaker, IncomingMessageHandler, OutgoingMessageHandler - -class HelloWorldReceiver(IncomingMessageHandler): - def on_message(self, event): - print event.message.body - event.connection.close() - -class HelloWorldSender(OutgoingMessageHandler): - def on_credit(self, event): - event.sender.send_msg(Message(body=u"Hello World!")) - event.sender.close() - - def on_accepted(self, event): - event.connection.close() - -class HelloWorld(ClientEndpointHandler): - def __init__(self, eventloop, url, address): - self.eventloop = eventloop - self.acceptor = eventloop.listen(url) - self.conn = eventloop.connect(url, handler=self) - self.address = address - - def on_connection_opened(self, event): - self.conn.create_sender(self.address, handler=HelloWorldSender()) - - def on_connection_closed(self, event): - self.acceptor.close() - - def run(self): - self.eventloop.run() - -eventloop = EventLoop(HelloWorldReceiver(), Handshaker(), FlowController(1)) -HelloWorld(eventloop, "localhost:8888", "examples").run() - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_direct_tornado.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_direct_tornado.py b/tutorial/helloworld_direct_tornado.py index 4635850..1982f18 100755 --- a/tutorial/helloworld_direct_tornado.py +++ b/tutorial/helloworld_direct_tornado.py @@ -19,38 +19,34 @@ # from proton import Message -from proton_events import ClientHandler, FlowController, Handshaker, IncomingMessageHandler +from proton_events import MessagingHandler from proton_tornado import TornadoLoop -class HelloWorldReceiver(IncomingMessageHandler): - def on_message(self, event): - print event.message.body - event.connection.close() - -class HelloWorld(ClientHandler): - def __init__(self, eventloop, url, address): - self.eventloop = eventloop - self.acceptor = eventloop.listen(url) - self.conn = eventloop.connect(url, handler=self) +class HelloWorld(MessagingHandler): + def __init__(self, server, address): + super(HelloWorld, self).__init__() + self.server = server self.address = address - def on_connection_opened(self, event): - self.conn.create_sender(self.address) + def on_start(self, event): + self.eventloop = event.reactor + self.acceptor = event.reactor.listen(self.server) + ctxt = event.reactor.connect(self.server) + ctxt.create_sender(self.address) def on_credit(self, event): event.sender.send_msg(Message(body=u"Hello World!")) event.sender.close() + def on_message(self, event): + print event.message.body + def on_accepted(self, event): - self.conn.close() + event.connection.close() def on_connection_closed(self, event): self.acceptor.close() self.eventloop.stop() - def run(self): - self.eventloop.run() - -eventloop = TornadoLoop(HelloWorldReceiver(), Handshaker(), FlowController(1)) -HelloWorld(eventloop, "localhost:8888", "examples").run() +TornadoLoop(HelloWorld("localhost:8888", "examples")).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_simple.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_simple.py b/tutorial/helloworld_simple.py index da56457..35c4dbd 100755 --- a/tutorial/helloworld_simple.py +++ b/tutorial/helloworld_simple.py @@ -21,7 +21,10 @@ from proton import Message import proton_events -class HelloWorld(proton_events.ClientHandler): +class HelloWorld(proton_events.MessagingHandler): + def __init__(self): + super(HelloWorld, self).__init__() + def on_credit(self, event): event.sender.send_msg(Message(body=u"Hello World!")) event.sender.close() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_simplistic.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_simplistic.py b/tutorial/helloworld_simplistic.py index 2f5cc86..31874be 100755 --- a/tutorial/helloworld_simplistic.py +++ b/tutorial/helloworld_simplistic.py @@ -19,14 +19,14 @@ # from proton import Message -from proton_events import EventLoop, IncomingMessageHandler +from proton_events import EventLoop, FlowController, IncomingMessageHandler class HelloWorldReceiver(IncomingMessageHandler): def on_message(self, event): print event.message.body event.connection.close() -eventloop = EventLoop() +eventloop = EventLoop(FlowController(1)) conn = eventloop.connect("localhost:5672") conn.create_receiver("examples", handler=HelloWorldReceiver()) sender = conn.create_sender("examples") http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_tornado.py ---------------------------------------------------------------------- diff --git a/tutorial/helloworld_tornado.py b/tutorial/helloworld_tornado.py index 2821eb6..be251ce 100755 --- a/tutorial/helloworld_tornado.py +++ b/tutorial/helloworld_tornado.py @@ -19,18 +19,20 @@ # from proton import Message -from proton_events import ClientHandler +from proton_events import MessagingHandler from proton_tornado import TornadoLoop -class HelloWorld(ClientHandler): - def __init__(self, eventloop, url, address): - self.eventloop = eventloop - self.conn = eventloop.connect(url, handler=self) +class HelloWorld(MessagingHandler): + def __init__(self, server, address): + super(HelloWorld, self).__init__() + self.server = server self.address = address - def on_connection_opened(self, event): - self.conn.create_receiver(self.address) - self.conn.create_sender(self.address) + def on_start(self, event): + self.eventloop = event.reactor + ctxt = event.reactor.connect(self.server) + ctxt.create_receiver(self.address) + ctxt.create_sender(self.address) def on_credit(self, event): event.sender.send_msg(Message(body=u"Hello World!")) @@ -43,8 +45,5 @@ class HelloWorld(ClientHandler): def on_connection_closed(self, event): self.eventloop.stop() - def run(self): - self.eventloop.run() - -HelloWorld(TornadoLoop(), "localhost:5672", "examples").run() +TornadoLoop(HelloWorld("localhost:5672", "examples")).run() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/proton_events.py ---------------------------------------------------------------------- diff --git a/tutorial/proton_events.py b/tutorial/proton_events.py index d80691b..a1faee1 100644 --- a/tutorial/proton_events.py +++ b/tutorial/proton_events.py @@ -17,7 +17,7 @@ # under the License. # import heapq, os, Queue, re, socket, time, types -from proton import generate_uuid, PN_ACCEPTED, SASL, symbol, ulong +from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException from select import select @@ -224,11 +224,25 @@ class EventInjector(object): def removed(self): pass def tick(self): return None +def nested_handlers(handlers): + # currently only allows for a single level of nesting + nested = [] + for h in handlers: + nested.append(h) + if hasattr(h, 'handlers'): + nested.extend(getattr(h, 'handlers')) + return nested + +def add_nested_handler(handler, nested): + if hasattr(handler, 'handlers'): + getattr(handler, 'handlers').append(nested) + else: + handler.handlers = [nested] class Events(object): - def __init__(self, *dispatchers): + def __init__(self, *handlers): self.collector = Collector() - self.dispatchers = dispatchers + self.handlers = handlers def connection(self): conn = Connection() @@ -245,8 +259,8 @@ class Events(object): return def dispatch(self, event): - for d in self.dispatchers: - event.dispatch(d) + for h in self.handlers: + event.dispatch(h) @property def next_interval(self): @@ -286,9 +300,14 @@ class ApplicationEvent(Event): return "%s(%s)" % (self.type.name, ", ".join([str(o) for o in objects if o is not None])) +class StartEvent(ApplicationEvent): + def __init__(self, reactor): + super(StartEvent, self).__init__("start") + self.reactor = reactor + class ScheduledEvents(Events): - def __init__(self, *dispatchers): - super(ScheduledEvents, self).__init__(*dispatchers) + def __init__(self, *handlers): + super(ScheduledEvents, self).__init__(*handlers) self._events = [] def schedule(self, deadline, event): @@ -464,11 +483,14 @@ class ScopedHandler(Handler): return objects = [getattr(event, attr) for attr in self.scopes.get(event.clazz, [])] targets = [getattr(o, "context") for o in objects if hasattr(o, "context")] - handlers = [getattr(t, event.type.method) for t in targets if hasattr(t, event.type.method)] + handlers = [getattr(t, event.type.method) for t in nested_handlers(targets) if hasattr(t, event.type.method)] for h in handlers: h(event) class OutgoingMessageHandler(Handler): + def __init__(self, auto_settle=True, delegate=None): + self.auto_settle = auto_settle + self.delegate = delegate def on_link_flow(self, event): if event.link.is_sender and event.link.credit: @@ -488,16 +510,32 @@ class OutgoingMessageHandler(Handler): self.on_modified(event) if dlv.settled: self.on_settled(event) - if self.auto_settle(): + if self.auto_settle: dlv.settle() - def on_credit(self, event): pass - def on_accepted(self, event): pass - def on_rejected(self, event): pass - def on_released(self, event): pass - def on_modified(self, event): pass - def on_settled(self, event): pass - def auto_settle(self): return True + def on_credit(self, event): + if self.delegate: + dispatch(self.delegate, 'on_credit', event) + + def on_accepted(self, event): + if self.delegate: + dispatch(self.delegate, 'on_accepted', event) + + def on_rejected(self, event): + if self.delegate: + dispatch(self.delegate, 'on_rejected', event) + + def on_released(self, event): + if self.delegate: + dispatch(self.delegate, 'on_released', event) + + def on_modified(self, event): + if self.delegate: + dispatch(self.delegate, 'on_modified', event) + + def on_settled(self, event): + if self.delegate: + dispatch(self.delegate, 'on_settled', event) def recv_msg(delivery): msg = Message() @@ -511,23 +549,7 @@ class Reject(ProtonException): """ pass -class IncomingMessageHandler(Handler): - def on_delivery(self, event): - dlv = event.delivery - if dlv.released or not dlv.link.is_receiver: return - if dlv.readable and not dlv.partial: - event.message = recv_msg(dlv) - try: - self.on_message(event) - if self.auto_accept(): - dlv.update(Delivery.ACCEPTED) - dlv.settle() - except Reject: - dlv.update(Delivery.REJECTED) - dlv.settle() - elif dlv.updated and dlv.settled: - self.on_settled(event) - +class Acking(object): def accept(self, delivery): self.settle(delivery, Delivery.ACCEPTED) @@ -545,66 +567,86 @@ class IncomingMessageHandler(Handler): delivery.update(state) delivery.settle() - def on_message(self, event): pass - def on_settled(self, event): pass - def auto_accept(self): return True +class IncomingMessageHandler(Handler, Acking): + def __init__(self, auto_accept=True, delegate=None): + self.delegate = delegate + self.auto_accept = auto_accept -class ClientEndpointHandler(Handler): + def on_delivery(self, event): + dlv = event.delivery + if dlv.released or not dlv.link.is_receiver: return + if dlv.readable and not dlv.partial: + event.message = recv_msg(dlv) + try: + self.on_message(event) + if self.auto_accept: + dlv.update(Delivery.ACCEPTED) + dlv.settle() + except Reject: + dlv.update(Delivery.REJECTED) + dlv.settle() + elif dlv.updated and dlv.settled: + self.on_settled(event) + + def on_message(self, event): + if self.delegate: + dispatch(self.delegate, 'on_message', event) + + def on_settled(self, event): + if self.delegate: + dispatch(self.delegate, 'on_settled', event) + +class EndpointStateHandler(Handler): + def __init__(self, peer_close_is_error=False, delegate=None): + self.delegate = delegate + self.peer_close_is_error = peer_close_is_error def is_local_open(self, endpoint): return endpoint.state & Endpoint.LOCAL_ACTIVE + def is_local_uninitialised(self, endpoint): + return endpoint.state & Endpoint.LOCAL_UNINIT + + def is_local_closed(self, endpoint): + return endpoint.state & Endpoint.LOCAL_CLOSED + def is_remote_open(self, endpoint): return endpoint.state & Endpoint.REMOTE_ACTIVE def is_remote_closed(self, endpoint): return endpoint.state & Endpoint.REMOTE_CLOSED - def was_closed_by_peer(self, endpoint, parent=None): - if parent: - return self.was_closed_by_peer(parent) and self.was_closed_by_peer(endpoint) - else: - return self.is_local_open(endpoint) and self.is_remote_closed(endpoint) - - def treat_as_error(self, endpoint, parent=None): - return endpoint.remote_condition or self.was_closed_by_peer(endpoint, parent) - def print_error(self, endpoint, endpoint_type): if endpoint.remote_condition: print endpoint.remote_condition.description - elif self.was_closed_by_peer(endpoint): + elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint): print "%s closed by peer" % endpoint_type def on_link_remote_close(self, event): - if self.treat_as_error(event.link, event.connection): + if event.link.remote_condition: self.on_link_error(event) - else: + elif self.is_local_closed(event.link): self.on_link_closed(event) + else: + self.on_link_closing(event) + event.link.close() def on_session_remote_close(self, event): - if self.treat_as_error(event.session, event.connection): + if event.session.remote_condition: self.on_session_error(event) - else: + elif self.is_local_closed(event.session): self.on_session_closed(event) + else: + self.on_session_closing(event) + event.session.close() def on_connection_remote_close(self, event): - if self.treat_as_error(event.connection): + if event.connection.remote_condition: self.on_connection_error(event) - else: + elif self.is_local_closed(event.connection): self.on_connection_closed(event) - - def on_connection_error(self, event): - self.print_error(event.connection, "connection") - event.connection.close() - - def on_session_error(self, event): - self.print_error(event.session, "session") - event.session.close() - event.connection.close() - - def on_link_error(self, event): - self.print_error(event.link, "link") - event.link.close() + else: + self.on_connection_closing(event) event.connection.close() def on_connection_local_open(self, event): @@ -614,6 +656,9 @@ class ClientEndpointHandler(Handler): def on_connection_remote_open(self, event): if self.is_local_open(event.connection): self.on_connection_opened(event) + elif self.is_local_uninitialised(event.connection): + self.on_connection_opening(event) + event.connection.open() def on_session_local_open(self, event): if self.is_remote_open(event.session): @@ -622,6 +667,9 @@ class ClientEndpointHandler(Handler): def on_session_remote_open(self, event): if self.is_local_open(event.session): self.on_session_opened(event) + elif self.is_local_uninitialised(event.session): + self.on_session_opening(event) + event.session.open() def on_link_local_open(self, event): if self.is_remote_open(event.link): @@ -630,37 +678,95 @@ class ClientEndpointHandler(Handler): def on_link_remote_open(self, event): if self.is_local_open(event.link): self.on_link_opened(event) + elif self.is_local_uninitialised(event.link): + self.on_link_opening(event) + event.link.open() def on_connection_opened(self, event): - pass + if self.delegate: + dispatch(self.delegate, 'on_connection_opened', event) def on_session_opened(self, event): - pass + if self.delegate: + dispatch(self.delegate, 'on_session_opened', event) def on_link_opened(self, event): - pass + if self.delegate: + dispatch(self.delegate, 'on_link_opened', event) + + def on_connection_opening(self, event): + if self.delegate: + dispatch(self.delegate, 'on_connection_opening', event) + + def on_session_opening(self, event): + if self.delegate: + dispatch(self.delegate, 'on_session_opening', event) + + def on_link_opening(self, event): + if self.delegate: + dispatch(self.delegate, 'on_link_opening', event) + + def on_connection_error(self, event): + if self.delegate: + dispatch(self.delegate, 'on_connection_error', event) + else: + self.print_error(event.connection, "connection") + + def on_session_error(self, event): + if self.delegate: + dispatch(self.delegate, 'on_session_error', event) + else: + self.print_error(event.session, "session") + event.connection.close() + + def on_link_error(self, event): + if self.delegate: + dispatch(self.delegate, 'on_link_error', event) + else: + self.print_error(event.link, "link") + event.connection.close() def on_connection_closed(self, event): - pass + if self.delegate: + dispatch(self.delegate, 'on_connection_closed', event) def on_session_closed(self, event): - pass + if self.delegate: + dispatch(self.delegate, 'on_session_closed', event) def on_link_closed(self, event): - pass + if self.delegate: + dispatch(self.delegate, 'on_link_closed', event) -class ClientHandler(ClientEndpointHandler, IncomingMessageHandler, OutgoingMessageHandler): + def on_connection_closing(self, event): + if self.delegate: + dispatch(self.delegate, 'on_connection_closing', event) + elif self.peer_close_is_error: + self.on_connection_error(event) - def __init__(self): - super(ClientHandler, self).__init__() + def on_session_closing(self, event): + if self.delegate: + dispatch(self.delegate, 'on_session_closing', event) + elif self.peer_close_is_error: + self.on_session_error(event) - def on_delivery(self, event): - IncomingMessageHandler.on_delivery(self, event) - OutgoingMessageHandler.on_delivery(self, event) + def on_link_closing(self, event): + if self.delegate: + dispatch(self.delegate, 'on_link_closing', event) + elif self.peer_close_is_error: + self.on_link_error(event) - def on_settled(self, event): - IncomingMessageHandler.on_settled(self, event) - OutgoingMessageHandler.on_settled(self, event) +class MessagingHandler(Handler, Acking): + def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, peer_close_is_error=False): + self.handlers = [] + # FlowController if used needs to see event before + # IncomingMessageHandler, as the latter may involve the + # delivery being released + if prefetch: + self.handlers.append(FlowController(prefetch)) + self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) + self.handlers.append(IncomingMessageHandler(auto_accept, self)) + self.handlers.append(OutgoingMessageHandler(auto_settle, self)) def delivery_tags(): count = 1 @@ -682,18 +788,7 @@ def send_msg(sender, msg, tag=None, handler=None, transaction=None): def _send_msg(self, msg, tag=None, handler=None, transaction=None): return send_msg(self, msg, tag, handler, transaction) -class TransactionHandler(OutgoingMessageHandler): - def on_settled(self, event): - if hasattr(event.delivery, "transaction"): - event.transaction = event.delivery.transaction - event.delivery.transaction.handle_outcome(event) - - def on_transaction_declared(self, event): pass - def on_transaction_committed(self, event): pass - def on_transaction_aborted(self, event): pass - def on_transaction_declare_failed(self, event): pass - def on_transaction_commit_failed(self, event): pass - +class TransactionalAcking(object): def accept(self, delivery, transaction): self.settle(delivery, transaction, PN_ACCEPTED) @@ -703,17 +798,48 @@ class TransactionHandler(OutgoingMessageHandler): delivery.update(0x34) delivery.settle() -class TransactionalClientHandler(ClientEndpointHandler, TransactionHandler, IncomingMessageHandler): - def __init__(self): - super(TransactionalClientHandler, self).__init__() - - def on_delivery(self, event): - IncomingMessageHandler.on_delivery(self, event) - TransactionHandler.on_delivery(self, event) +class TransactionHandler(OutgoingMessageHandler, TransactionalAcking): + def __init__(self, auto_settle=True, delegate=None): + super(TransactionHandler, self).__init__(auto_settle, delegate) def on_settled(self, event): - IncomingMessageHandler.on_settled(self, event) - TransactionHandler.on_settled(self, event) + if hasattr(event.delivery, "transaction"): + event.transaction = event.delivery.transaction + event.delivery.transaction.handle_outcome(event) + + def on_transaction_declared(self, event): + if self.delegate: + dispatch(self.delegate, 'on_transaction_declared', event) + + def on_transaction_committed(self, event): + if self.delegate: + dispatch(self.delegate, 'on_transaction_committed', event) + + def on_transaction_aborted(self, event): + if self.delegate: + dispatch(self.delegate, 'on_transaction_aborted', event) + + def on_transaction_declare_failed(self, event): + if self.delegate: + dispatch(self.delegate, 'on_transaction_declare_failed', event) + + def on_transaction_commit_failed(self, event): + if self.delegate: + dispatch(self.delegate, 'on_transaction_commit_failed', event) + +class TransactionalClientHandler(Handler, TransactionalAcking): + def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, peer_close_is_error=False): + super(TransactionalClientHandler, self).__init__() + self.handlers = [] + # FlowController if used needs to see event before + # IncomingMessageHandler, as the latter may involve the + # delivery being released + if prefetch: + self.handlers.append(FlowController(prefetch)) + self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) + self.handlers.append(IncomingMessageHandler(auto_accept, self)) + self.handlers.append(TransactionHandler(auto_settle, self)) + class Transaction(object): def __init__(self, txn_ctrl, handler): @@ -1011,11 +1137,9 @@ class Urls(object): class EventLoop(object): def __init__(self, *handlers): self.connector = Connector() - if handlers: - l = handlers + (self.connector, ScopedHandler()) - else: - l = [FlowController(10), self.connector, ScopedHandler()] - self.events = ScheduledEvents(*l) + h = [self.connector, ScopedHandler()] + h.extend(nested_handlers(handlers)) + self.events = ScheduledEvents(*h) self.loop = SelectLoop(self.events) self.connector.attach_to(self) self.trigger = None @@ -1056,6 +1180,7 @@ class EventLoop(object): self.loop.remove(selectable) def run(self): + self.events.dispatch(StartEvent(self)) self.loop.run() def stop(self): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/proton_tornado.py ---------------------------------------------------------------------- diff --git a/tutorial/proton_tornado.py b/tutorial/proton_tornado.py index 470eafa..0275169 100644 --- a/tutorial/proton_tornado.py +++ b/tutorial/proton_tornado.py @@ -18,7 +18,7 @@ # under the License. # -from proton_events import ApplicationEvent, EventLoop +from proton_events import ApplicationEvent, EventLoop, StartEvent import tornado.ioloop class TornadoLoop(EventLoop): @@ -41,6 +41,7 @@ class TornadoLoop(EventLoop): self.loop.remove_handler(conn) def run(self): + self.events.dispatch(StartEvent(self)) self.loop.start() def stop(self): @@ -50,8 +51,10 @@ class TornadoLoop(EventLoop): flags = 0 if conn.reading(): flags |= tornado.ioloop.IOLoop.READ - if conn.writing(): - flags |= tornado.ioloop.IOLoop.WRITE + # FIXME: need way to update flags to avoid busy loop + #if conn.writing(): + # flags |= tornado.ioloop.IOLoop.WRITE + flags |= tornado.ioloop.IOLoop.WRITE return flags def _connection_ready(self, conn, events): @@ -59,9 +62,9 @@ class TornadoLoop(EventLoop): conn.readable() if events & tornado.ioloop.IOLoop.WRITE: conn.writable() - if events & tornado.ioloop.IOLoop.ERROR or conn.closed(): - conn.close() + if events & tornado.ioloop.IOLoop.ERROR:# or conn.closed(): self.loop.remove_handler(conn) + conn.close() conn.removed() self.events.process() self.loop.update_handler(conn, self._get_event_flags(conn)) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/server.py ---------------------------------------------------------------------- diff --git a/tutorial/server.py b/tutorial/server.py index b567179..c426fc3 100755 --- a/tutorial/server.py +++ b/tutorial/server.py @@ -19,16 +19,24 @@ # from proton import Message -from proton_events import EventLoop, ClientHandler +from proton_events import EventLoop, MessagingHandler -class Server(ClientHandler): - def __init__(self, eventloop, host, address): - self.eventloop = eventloop - self.conn = eventloop.connect(host, handler=self) - self.receiver = self.conn.create_receiver(address, handler=self) +class Server(MessagingHandler): + def __init__(self, host, address): + super(Server, self).__init__() + self.host = host + self.address = address + + def on_start(self, event): + self.conn = event.reactor.connect(self.host) + self.receiver = self.conn.create_receiver(self.address) self.senders = {} self.relay = None + def on_connection_opened(self, event): + if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: + self.relay = self.conn.create_sender(None) + def on_message(self, event): sender = self.relay if not sender: @@ -38,15 +46,8 @@ class Server(ClientHandler): self.senders[event.message.reply_to] = sender sender.send_msg(Message(address=event.message.reply_to, body=event.message.body.upper())) - def on_connection_opened(self, event): - if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: - self.relay = self.conn.create_sender(None) - - def run(self): - self.eventloop.run() - try: - Server(EventLoop(), "localhost:5672", "examples").run() + EventLoop(Server("localhost:5672", "examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/server_tx.py ---------------------------------------------------------------------- diff --git a/tutorial/server_tx.py b/tutorial/server_tx.py index cb7c3a2..c366d08 100755 --- a/tutorial/server_tx.py +++ b/tutorial/server_tx.py @@ -19,14 +19,15 @@ # from proton import Message -from proton_events import EventLoop, ClientHandler, TransactionHandler +from proton_events import EventLoop, MessagingHandler, TransactionHandler class TxRequest(TransactionHandler): - def __init__(self, response, sender, request_delivery, conn): + def __init__(self, response, sender, request_delivery, context): + super(TxRequest, self).__init__() self.response = response self.sender = sender self.request_delivery = request_delivery - self.conn = conn + self.context = context def on_transaction_declared(self, event): self.sender.send_msg(self.response, transaction=event.transaction) @@ -40,39 +41,35 @@ class TxRequest(TransactionHandler): print "Request processing aborted" -class TxServer(ClientHandler): +class TxServer(MessagingHandler): def __init__(self, host, address): - self.eventloop = EventLoop() - self.conn = self.eventloop.connect(host, handler=self, reconnect=False) - self.receiver = self.conn.create_receiver(address, handler=self) + super(TxServer, self).__init__(auto_accept=False) + self.host = host + self.address = address + + def on_start(self, event): + self.context = event.reactor.connect(self.host, reconnect=False) + self.receiver = self.context.create_receiver(self.address) self.senders = {} self.relay = None - def auto_accept(self): return False - - def on_link_open(self, event): - self.conn.create_transaction() - def on_message(self, event): sender = self.relay if not sender: sender = self.senders.get(event.message.reply_to) if not sender: - sender = self.conn.create_sender(event.message.reply_to) + sender = self.context.create_sender(event.message.reply_to) self.senders[event.message.reply_to] = sender response = Message(address=event.message.reply_to, body=event.message.body.upper()) - self.conn.declare_transaction(handler=TxRequest(response, sender, event.delivery, self.conn)) + self.context.declare_transaction(handler=TxRequest(response, sender, event.delivery, self.context)) def on_connection_open(self, event): if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: - self.relay = self.conn.create_sender(None) - - def run(self): - self.eventloop.run() + self.relay = self.context.create_sender(None) try: - TxServer("localhost:5672", "examples").run() + EventLoop(TxServer("localhost:5672", "examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/simple_recv.py ---------------------------------------------------------------------- diff --git a/tutorial/simple_recv.py b/tutorial/simple_recv.py index f0524d8..7b58553 100755 --- a/tutorial/simple_recv.py +++ b/tutorial/simple_recv.py @@ -18,16 +18,23 @@ # under the License. # -import proton_events +from proton_events import EventLoop, MessagingHandler + +class Recv(MessagingHandler): + def __init__(self, host, address): + super(Recv, self).__init__() + self.host = host + self.address = address + + def on_start(self, event): + conn = event.reactor.connect(self.host) + conn.create_receiver(self.address) -class Recv(proton_events.ClientHandler): def on_message(self, event): print event.message.body try: - conn = proton_events.connect("localhost:5672", handler=Recv()) - conn.create_receiver("examples") - proton_events.run() + EventLoop(Recv("localhost:5672", "examples")).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/simple_send.py ---------------------------------------------------------------------- diff --git a/tutorial/simple_send.py b/tutorial/simple_send.py index f7ad67e..7c86718 100755 --- a/tutorial/simple_send.py +++ b/tutorial/simple_send.py @@ -19,18 +19,25 @@ # from proton import Message -import proton_events +from proton_events import EventLoop, MessagingHandler -class Send(proton_events.ClientHandler): - def __init__(self, messages): +class Send(MessagingHandler): + def __init__(self, host, address, messages): + super(Send, self).__init__() + self.host = host + self.address = address self.sent = 0 self.confirmed = 0 self.total = messages + def on_start(self, event): + conn = event.reactor.connect(self.host) + conn.create_sender(self.address) + def on_credit(self, event): - while event.link.credit and self.sent < self.total: + while event.sender.credit and self.sent < self.total: msg = Message(body={'sequence':(self.sent+1)}) - event.link.send_msg(msg) + event.sender.send_msg(msg) self.sent += 1 def on_accepted(self, event): @@ -43,7 +50,5 @@ class Send(proton_events.ClientHandler): self.sent = self.confirmed try: - conn = proton_events.connect("localhost:5672", handler=Send(10000)) - conn.create_sender("examples") - proton_events.run() + EventLoop(Send("localhost:5672", "examples", 10000)).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/tx_recv.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_recv.py b/tutorial/tx_recv.py index 9a34563..7d87c94 100755 --- a/tutorial/tx_recv.py +++ b/tutorial/tx_recv.py @@ -22,6 +22,7 @@ import proton_events class TxRecv(proton_events.TransactionalClientHandler): def __init__(self, batch_size): + super(TxRecv, self).__init__(prefetch=0) self.current_batch = 0 self.batch_size = batch_size self.event_loop = proton_events.EventLoop(self) @@ -42,8 +43,6 @@ class TxRecv(proton_events.TransactionalClientHandler): self.receiver.flow(self.batch_size) self.transaction = event.transaction - def auto_accept(self): return False - def on_transaction_committed(self, event): self.current_batch = 0 self.conn.declare_transaction(handler=self) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/tx_send.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_send.py b/tutorial/tx_send.py index eb719cb..7fc3951 100755 --- a/tutorial/tx_send.py +++ b/tutorial/tx_send.py @@ -23,8 +23,10 @@ import proton_events class TxSend(proton_events.TransactionalClientHandler): def __init__(self, messages, batch_size): + super(TxSend, self).__init__() self.current_batch = 0 self.committed = 0 + self.confirmed = 0 self.total = messages self.batch_size = batch_size self.conn = proton_events.connect("localhost:5672", handler=self) @@ -68,6 +70,5 @@ class TxSend(proton_events.TransactionalClientHandler): proton_events.run() try: - #TxSend(10000, 10).run() - TxSend(9, 3).run() + TxSend(10000, 10).run() except KeyboardInterrupt: pass http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/tx_send_sync.py ---------------------------------------------------------------------- diff --git a/tutorial/tx_send_sync.py b/tutorial/tx_send_sync.py index 7064189..66679f9 100755 --- a/tutorial/tx_send_sync.py +++ b/tutorial/tx_send_sync.py @@ -23,6 +23,7 @@ import proton_events class TxSend(proton_events.TransactionalClientHandler): def __init__(self, messages, batch_size): + super(TxSend, self).__init__() self.current_batch = 0 self.confirmed = 0 self.committed = 0 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
