Repository: qpid-proton
Updated Branches:
  refs/heads/examples 020bf1694 -> 6aa4b4c8d


Some fixes and refactorings


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6aa4b4c8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6aa4b4c8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6aa4b4c8

Branch: refs/heads/examples
Commit: 6aa4b4c8d108f2027bd0d9bd3247a616ae59366b
Parents: 020bf16
Author: Gordon Sim <[email protected]>
Authored: Wed Nov 19 17:23:09 2014 +0000
Committer: Gordon Sim <[email protected]>
Committed: Wed Nov 19 17:23:09 2014 +0000

----------------------------------------------------------------------
 proton-c/bindings/python/proton.py    |   2 +-
 tutorial/client.py                    |  33 +--
 tutorial/client_http.py               |  68 ++++--
 tutorial/db_recv.py                   |  19 +-
 tutorial/db_send.py                   |  22 +-
 tutorial/helloworld.py                |  17 +-
 tutorial/helloworld_alt.py            |  48 ----
 tutorial/helloworld_direct.py         |  33 ++-
 tutorial/helloworld_direct_alt.py     |  55 -----
 tutorial/helloworld_direct_tornado.py |  34 ++-
 tutorial/helloworld_simple.py         |   5 +-
 tutorial/helloworld_simplistic.py     |   4 +-
 tutorial/helloworld_tornado.py        |  23 +-
 tutorial/proton_events.py             | 339 ++++++++++++++++++++---------
 tutorial/proton_tornado.py            |  13 +-
 tutorial/server.py                    |  29 +--
 tutorial/server_tx.py                 |  35 ++-
 tutorial/simple_recv.py               |  17 +-
 tutorial/simple_send.py               |  21 +-
 tutorial/tx_recv.py                   |   3 +-
 tutorial/tx_send.py                   |   5 +-
 tutorial/tx_send_sync.py              |   1 +
 22 files changed, 446 insertions(+), 380 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/proton-c/bindings/python/proton.py
----------------------------------------------------------------------
diff --git a/proton-c/bindings/python/proton.py 
b/proton-c/bindings/python/proton.py
index 7d98929..a8f0db6 100644
--- a/proton-c/bindings/python/proton.py
+++ b/proton-c/bindings/python/proton.py
@@ -3404,7 +3404,7 @@ def dispatch(handler, method, *args):
   elif hasattr(handler, "on_unhandled"):
     return handler.on_unhandled(method, args)
 
-class Event:
+class Event(object):
 
   CONNECTION_INIT = EventType(PN_CONNECTION_INIT, "on_connection_init")
   CONNECTION_BOUND = EventType(PN_CONNECTION_BOUND, "on_connection_bound")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/client.py
----------------------------------------------------------------------
diff --git a/tutorial/client.py b/tutorial/client.py
index 578d06a..e3b705c 100755
--- a/tutorial/client.py
+++ b/tutorial/client.py
@@ -19,22 +19,28 @@
 #
 
 from proton import Message
-from proton_events import EventLoop, ClientHandler
-
-class Client(ClientHandler):
-    def __init__(self, eventloop, host, address, requests):
-        self.eventloop = eventloop
-        self.conn = eventloop.connect(host)
-        self.sender = self.conn.create_sender(address)
-        self.receiver = self.conn.create_receiver(None, dynamic=True, 
handler=self)
+from proton_events import EventLoop, MessagingHandler
+
+class Client(MessagingHandler):
+    def __init__(self, host, address, requests):
+        super(Client, self).__init__()
+        self.host = host
+        self.address = address
         self.requests = requests
 
+    def on_start(self, event):
+        self.conn = event.reactor.connect(self.host)
+        self.sender = self.conn.create_sender(self.address)
+        self.receiver = self.conn.create_receiver(None, dynamic=True)
+
     def next_request(self):
-        req = Message(reply_to=self.receiver.remote_source.address, 
body=self.requests[0])
-        self.sender.send_msg(req)
+        if self.receiver.remote_source.address:
+            req = Message(reply_to=self.receiver.remote_source.address, 
body=self.requests[0])
+            self.sender.send_msg(req)
 
     def on_link_opened(self, event):
-        self.next_request()
+        if event.receiver == self.receiver:
+            self.next_request()
 
     def on_message(self, event):
         print "%s => %s" % (self.requests.pop(0), event.message.body)
@@ -43,13 +49,10 @@ class Client(ClientHandler):
         else:
             self.conn.close()
 
-    def run(self):
-        self.eventloop.run()
-
 REQUESTS= ["Twas brillig, and the slithy toves",
            "Did gire and gymble in the wabe.",
            "All mimsy were the borogroves,",
            "And the mome raths outgrabe."]
 
-Client(EventLoop(), "localhost:5672", "examples", REQUESTS).run()
+EventLoop(Client("localhost:5672", "examples", REQUESTS)).run()
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/client_http.py
----------------------------------------------------------------------
diff --git a/tutorial/client_http.py b/tutorial/client_http.py
index f78cf39..afa0c78 100755
--- a/tutorial/client_http.py
+++ b/tutorial/client_http.py
@@ -19,14 +19,56 @@
 #
 
 from proton import Message
-from proton_events import ClientHandler
+from proton_events import MessagingHandler
 from proton_tornado import TornadoLoop
 from tornado.ioloop import IOLoop
 import tornado.web
 
-class ExampleHandler(tornado.web.RequestHandler, ClientHandler):
-    def initialize(self, loop):
-        self.loop = loop
+class Client(MessagingHandler):
+    def __init__(self, host, address):
+        super(Client, self).__init__()
+        self.host = host
+        self.address = address
+        self.sent = []
+        self.pending = []
+        self.reply_address = None
+        self.sender = None
+        self.receiver = None
+
+    def on_start(self, event):
+        context = event.reactor.connect(self.host)
+        self.sender = context.create_sender(self.address)
+        self.receiver = context.create_receiver(None, dynamic=True)
+
+    def on_link_opened(self, event):
+        if event.receiver == self.receiver:
+            self.reply_address = event.link.remote_source.address
+            self.do_request()
+
+    def on_credit(self, event):
+        self.do_request()
+
+    def on_message(self, event):
+        if self.sent:
+            request, handler = self.sent.pop(0)
+            print "%s => %s" % (request, event.message.body)
+            handler(event.message.body)
+            self.do_request()
+
+    def do_request(self):
+        if self.pending and self.reply_address and self.sender.credit:
+            request, handler = self.pending.pop(0)
+            self.sent.append((request, handler))
+            req = Message(reply_to=self.reply_address, body=request)
+            self.sender.send_msg(req)
+
+    def request(self, body, handler):
+        self.pending.append((body, handler))
+        self.do_request()
+
+class ExampleHandler(tornado.web.RequestHandler):
+    def initialize(self, client):
+        self.client = client
 
     def get(self):
         self._write_open()
@@ -35,22 +77,15 @@ class ExampleHandler(tornado.web.RequestHandler, 
ClientHandler):
 
     @tornado.web.asynchronous
     def post(self):
-        self.conn = self.loop.connect("localhost:5672")
-        self.sender = self.conn.create_sender("examples")
-        self.conn.create_receiver(None, dynamic=True, handler=self)
+        client.request(self.get_body_argument("message"), lambda x: 
self.on_response(x))
 
-    def on_link_opened(self, event):
-        req = Message(reply_to=event.link.remote_source.address, 
body=self.get_body_argument("message"))
-        self.sender.send_msg(req)
-
-    def on_message(self, event):
+    def on_response(self, body):
         self.set_header("Content-Type", "text/html")
         self._write_open()
         self._write_form()
-        self.write("Response: " + event.message.body)
+        self.write("Response: " + body)
         self._write_close()
         self.finish()
-        self.conn.close()
 
     def _write_open(self):
         self.write('<html><body>')
@@ -65,8 +100,9 @@ class ExampleHandler(tornado.web.RequestHandler, 
ClientHandler):
                    '</form>')
 
 
-loop = TornadoLoop()
-app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, 
dict(loop=loop))])
+client = Client("localhost:5672", "examples")
+loop = TornadoLoop(client)
+app = tornado.web.Application([tornado.web.url(r"/client", ExampleHandler, 
dict(client=client))])
 app.listen(8888)
 try:
     loop.run()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/db_recv.py
----------------------------------------------------------------------
diff --git a/tutorial/db_recv.py b/tutorial/db_recv.py
index 9a3a881..1dfb3e3 100755
--- a/tutorial/db_recv.py
+++ b/tutorial/db_recv.py
@@ -18,22 +18,22 @@
 # under the License.
 #
 
-from proton_events import ApplicationEvent, ClientHandler, EventLoop
+from proton_events import ApplicationEvent, MessagingHandler, EventLoop
 from db_common import Db
 
-class Recv(ClientHandler):
+class Recv(MessagingHandler):
     def __init__(self, host, address):
-        self.eventloop = EventLoop()
+        super(Recv, self).__init__(auto_accept=False)
         self.host = host
         self.address = address
         self.delay = 0
-        self.db = Db("dst_db", self.eventloop.get_event_trigger())
         # TODO: load last tag from db
         self.last_id = None
-        self.conn = self.eventloop.connect(self.host, handler=self)
-        self.conn.create_receiver(self.address)
 
-    def auto_accept(self): return False
+    def on_start(self, event):
+        self.db = Db("dst_db", event.reactor.get_event_trigger())
+        context = event.reactor.connect(self.host)
+        context.create_receiver(self.address)
 
     def on_record_inserted(self, event):
         self.accept(event.delivery)
@@ -47,11 +47,8 @@ class Recv(ClientHandler):
         else:
             self.accept(event.delivery)
 
-    def run(self):
-        self.eventloop.run()
-
 try:
-    Recv("localhost:5672", "examples").run()
+    EventLoop(Recv("localhost:5672", "examples")).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/db_send.py
----------------------------------------------------------------------
diff --git a/tutorial/db_send.py b/tutorial/db_send.py
index e15db19..4701fae 100755
--- a/tutorial/db_send.py
+++ b/tutorial/db_send.py
@@ -21,20 +21,23 @@
 import Queue
 import time
 from proton import Message
-from proton_events import ApplicationEvent, ClientHandler, EventLoop
+from proton_events import ApplicationEvent, MessagingHandler, EventLoop
 from db_common import Db
 
-class Send(ClientHandler):
+class Send(MessagingHandler):
     def __init__(self, host, address):
-        self.eventloop = EventLoop()
-        self.address = address
+        super(Send, self).__init__()
         self.host = host
+        self.address = address
         self.delay = 0
         self.sent = 0
         self.records = Queue.Queue(maxsize=50)
-        self.db = Db("src_db", self.eventloop.get_event_trigger())
-        self.conn = self.eventloop.connect(self.host, handler=self)
-        self.sender = self.conn.create_sender(self.address)
+
+    def on_start(self, event):
+        self.eventloop = event.reactor
+        self.db = Db("src_db", event.reactor.get_event_trigger())
+        context = event.reactor.connect(self.host)
+        self.sender = context.create_sender(self.address)
 
     def on_records_loaded(self, event):
         if self.records.empty() and event.subject == self.sent:
@@ -73,10 +76,7 @@ class Send(ClientHandler):
             print "Rechecking for data..."
             self.request_records()
 
-    def run(self):
-        self.eventloop.run()
-
 try:
-    Send("localhost:5672", "examples").run()
+    EventLoop(Send("localhost:5672", "examples")).run()
 except KeyboardInterrupt: pass
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld.py
----------------------------------------------------------------------
diff --git a/tutorial/helloworld.py b/tutorial/helloworld.py
index 9dfc356..a3965a8 100755
--- a/tutorial/helloworld.py
+++ b/tutorial/helloworld.py
@@ -19,16 +19,18 @@
 #
 
 from proton import Message
-import proton_events
+from proton_events import EventLoop, MessagingHandler
 
-class HelloWorld(proton_events.ClientHandler):
+class HelloWorld(MessagingHandler):
     def __init__(self, server, address):
+        super(HelloWorld, self).__init__()
+        self.server = server
         self.address = address
-        self.conn = proton_events.connect(server, handler=self)
 
-    def on_connection_opened(self, event):
-        self.conn.create_receiver(self.address)
-        self.conn.create_sender(self.address)
+    def on_start(self, event):
+        ctxt = event.reactor.connect(self.server)
+        ctxt.create_receiver(self.address)
+        ctxt.create_sender(self.address)
 
     def on_credit(self, event):
         event.sender.send_msg(Message(body=u"Hello World!"))
@@ -38,6 +40,5 @@ class HelloWorld(proton_events.ClientHandler):
         print event.message.body
         event.connection.close()
 
-HelloWorld("localhost:5672", "examples")
-proton_events.run()
+EventLoop(HelloWorld("localhost:5672", "examples")).run()
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_alt.py
----------------------------------------------------------------------
diff --git a/tutorial/helloworld_alt.py b/tutorial/helloworld_alt.py
deleted file mode 100755
index 93898bb..0000000
--- a/tutorial/helloworld_alt.py
+++ /dev/null
@@ -1,48 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from proton import Message
-from proton_events import ClientEndpointHandler, EventLoop, 
IncomingMessageHandler, OutgoingMessageHandler
-
-class HelloWorldReceiver(IncomingMessageHandler):
-    def on_message(self, event):
-        print event.message.body
-        event.connection.close()
-
-class HelloWorldSender(OutgoingMessageHandler):
-    def on_credit(self, event):
-        event.link.send_msg(Message(body=u"Hello World!"))
-        event.link.close()
-
-class HelloWorld(ClientEndpointHandler):
-    def __init__(self, url, address):
-        self.eventloop = EventLoop()
-        self.conn = self.eventloop.connect(url, handler=self)
-        self.address = address
-
-    def on_connection_opened(self, event):
-        self.conn.create_receiver(self.address, handler=HelloWorldReceiver())
-        self.conn.create_sender(self.address, handler=HelloWorldSender())
-
-    def run(self):
-        self.eventloop.run()
-
-HelloWorld("localhost:5672", "examples").run()
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_direct.py
----------------------------------------------------------------------
diff --git a/tutorial/helloworld_direct.py b/tutorial/helloworld_direct.py
index d311cd1..fd70c0c 100755
--- a/tutorial/helloworld_direct.py
+++ b/tutorial/helloworld_direct.py
@@ -19,35 +19,30 @@
 #
 
 from proton import Message
-from proton_events import ClientHandler, EventLoop, FlowController, 
Handshaker, IncomingMessageHandler
+from proton_events import EventLoop, MessagingHandler
 
-class HelloWorldReceiver(IncomingMessageHandler):
-    def on_message(self, event):
-        print event.message.body
-        event.connection.close()
-
-class HelloWorld(ClientHandler):
-    def __init__(self, eventloop, url, address):
-        self.eventloop = eventloop
-        self.acceptor = eventloop.listen(url)
-        self.conn = eventloop.connect(url, handler=self)
+class HelloWorld(MessagingHandler):
+    def __init__(self, server, address):
+        super(HelloWorld, self).__init__()
+        self.server = server
         self.address = address
 
-    def on_connection_opened(self, event):
-        self.conn.create_sender(self.address)
+    def on_start(self, event):
+        self.acceptor = event.reactor.listen(self.server)
+        ctxt = event.reactor.connect(self.server)
+        ctxt.create_sender(self.address)
 
     def on_credit(self, event):
         event.sender.send_msg(Message(body=u"Hello World!"))
         event.sender.close()
 
+    def on_message(self, event):
+        print event.message.body
+
     def on_accepted(self, event):
-        self.conn.close()
+        event.connection.close()
 
     def on_connection_closed(self, event):
         self.acceptor.close()
 
-    def run(self):
-        self.eventloop.run()
-
-eventloop = EventLoop(HelloWorldReceiver(), Handshaker(), FlowController(1))
-HelloWorld(eventloop, "localhost:8888", "examples").run()
+EventLoop(HelloWorld("localhost:8888", "examples")).run()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_direct_alt.py
----------------------------------------------------------------------
diff --git a/tutorial/helloworld_direct_alt.py 
b/tutorial/helloworld_direct_alt.py
deleted file mode 100755
index b5fa381..0000000
--- a/tutorial/helloworld_direct_alt.py
+++ /dev/null
@@ -1,55 +0,0 @@
-#!/usr/bin/env python
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-from proton import Message
-from proton_events import ClientEndpointHandler, EventLoop, FlowController, 
Handshaker, IncomingMessageHandler, OutgoingMessageHandler
-
-class HelloWorldReceiver(IncomingMessageHandler):
-    def on_message(self, event):
-        print event.message.body
-        event.connection.close()
-
-class HelloWorldSender(OutgoingMessageHandler):
-    def on_credit(self, event):
-        event.sender.send_msg(Message(body=u"Hello World!"))
-        event.sender.close()
-
-    def on_accepted(self, event):
-        event.connection.close()
-
-class HelloWorld(ClientEndpointHandler):
-    def __init__(self, eventloop, url, address):
-        self.eventloop = eventloop
-        self.acceptor = eventloop.listen(url)
-        self.conn = eventloop.connect(url, handler=self)
-        self.address = address
-
-    def on_connection_opened(self, event):
-        self.conn.create_sender(self.address, handler=HelloWorldSender())
-
-    def on_connection_closed(self, event):
-        self.acceptor.close()
-
-    def run(self):
-        self.eventloop.run()
-
-eventloop = EventLoop(HelloWorldReceiver(), Handshaker(), FlowController(1))
-HelloWorld(eventloop, "localhost:8888", "examples").run()
-

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_direct_tornado.py
----------------------------------------------------------------------
diff --git a/tutorial/helloworld_direct_tornado.py 
b/tutorial/helloworld_direct_tornado.py
index 4635850..1982f18 100755
--- a/tutorial/helloworld_direct_tornado.py
+++ b/tutorial/helloworld_direct_tornado.py
@@ -19,38 +19,34 @@
 #
 
 from proton import Message
-from proton_events import ClientHandler, FlowController, Handshaker, 
IncomingMessageHandler
+from proton_events import MessagingHandler
 from proton_tornado import TornadoLoop
 
-class HelloWorldReceiver(IncomingMessageHandler):
-    def on_message(self, event):
-        print event.message.body
-        event.connection.close()
-
-class HelloWorld(ClientHandler):
-    def __init__(self, eventloop, url, address):
-        self.eventloop = eventloop
-        self.acceptor = eventloop.listen(url)
-        self.conn = eventloop.connect(url, handler=self)
+class HelloWorld(MessagingHandler):
+    def __init__(self, server, address):
+        super(HelloWorld, self).__init__()
+        self.server = server
         self.address = address
 
-    def on_connection_opened(self, event):
-        self.conn.create_sender(self.address)
+    def on_start(self, event):
+        self.eventloop = event.reactor
+        self.acceptor = event.reactor.listen(self.server)
+        ctxt = event.reactor.connect(self.server)
+        ctxt.create_sender(self.address)
 
     def on_credit(self, event):
         event.sender.send_msg(Message(body=u"Hello World!"))
         event.sender.close()
 
+    def on_message(self, event):
+        print event.message.body
+
     def on_accepted(self, event):
-        self.conn.close()
+        event.connection.close()
 
     def on_connection_closed(self, event):
         self.acceptor.close()
         self.eventloop.stop()
 
-    def run(self):
-        self.eventloop.run()
-
-eventloop = TornadoLoop(HelloWorldReceiver(), Handshaker(), FlowController(1))
-HelloWorld(eventloop, "localhost:8888", "examples").run()
+TornadoLoop(HelloWorld("localhost:8888", "examples")).run()
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_simple.py
----------------------------------------------------------------------
diff --git a/tutorial/helloworld_simple.py b/tutorial/helloworld_simple.py
index da56457..35c4dbd 100755
--- a/tutorial/helloworld_simple.py
+++ b/tutorial/helloworld_simple.py
@@ -21,7 +21,10 @@
 from proton import Message
 import proton_events
 
-class HelloWorld(proton_events.ClientHandler):
+class HelloWorld(proton_events.MessagingHandler):
+    def __init__(self):
+        super(HelloWorld, self).__init__()
+
     def on_credit(self, event):
         event.sender.send_msg(Message(body=u"Hello World!"))
         event.sender.close()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_simplistic.py
----------------------------------------------------------------------
diff --git a/tutorial/helloworld_simplistic.py 
b/tutorial/helloworld_simplistic.py
index 2f5cc86..31874be 100755
--- a/tutorial/helloworld_simplistic.py
+++ b/tutorial/helloworld_simplistic.py
@@ -19,14 +19,14 @@
 #
 
 from proton import Message
-from proton_events import EventLoop, IncomingMessageHandler
+from proton_events import EventLoop, FlowController, IncomingMessageHandler
 
 class HelloWorldReceiver(IncomingMessageHandler):
     def on_message(self, event):
         print event.message.body
         event.connection.close()
 
-eventloop = EventLoop()
+eventloop = EventLoop(FlowController(1))
 conn = eventloop.connect("localhost:5672")
 conn.create_receiver("examples", handler=HelloWorldReceiver())
 sender = conn.create_sender("examples")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/helloworld_tornado.py
----------------------------------------------------------------------
diff --git a/tutorial/helloworld_tornado.py b/tutorial/helloworld_tornado.py
index 2821eb6..be251ce 100755
--- a/tutorial/helloworld_tornado.py
+++ b/tutorial/helloworld_tornado.py
@@ -19,18 +19,20 @@
 #
 
 from proton import Message
-from proton_events import ClientHandler
+from proton_events import MessagingHandler
 from proton_tornado import TornadoLoop
 
-class HelloWorld(ClientHandler):
-    def __init__(self, eventloop, url, address):
-        self.eventloop = eventloop
-        self.conn = eventloop.connect(url, handler=self)
+class HelloWorld(MessagingHandler):
+    def __init__(self, server, address):
+        super(HelloWorld, self).__init__()
+        self.server = server
         self.address = address
 
-    def on_connection_opened(self, event):
-        self.conn.create_receiver(self.address)
-        self.conn.create_sender(self.address)
+    def on_start(self, event):
+        self.eventloop = event.reactor
+        ctxt = event.reactor.connect(self.server)
+        ctxt.create_receiver(self.address)
+        ctxt.create_sender(self.address)
 
     def on_credit(self, event):
         event.sender.send_msg(Message(body=u"Hello World!"))
@@ -43,8 +45,5 @@ class HelloWorld(ClientHandler):
     def on_connection_closed(self, event):
         self.eventloop.stop()
 
-    def run(self):
-        self.eventloop.run()
-
-HelloWorld(TornadoLoop(), "localhost:5672", "examples").run()
+TornadoLoop(HelloWorld("localhost:5672", "examples")).run()
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/proton_events.py
----------------------------------------------------------------------
diff --git a/tutorial/proton_events.py b/tutorial/proton_events.py
index d80691b..a1faee1 100644
--- a/tutorial/proton_events.py
+++ b/tutorial/proton_events.py
@@ -17,7 +17,7 @@
 # under the License.
 #
 import heapq, os, Queue, re, socket, time, types
-from proton import generate_uuid, PN_ACCEPTED, SASL, symbol, ulong
+from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong
 from proton import Collector, Connection, Delivery, Described, Endpoint, 
Event, Link, Terminus, Timeout
 from proton import Message, Handler, ProtonException, Transport, 
TransportException, ConnectionException
 from select import select
@@ -224,11 +224,25 @@ class EventInjector(object):
     def removed(self): pass
     def tick(self): return None
 
+def nested_handlers(handlers):
+    # currently only allows for a single level of nesting
+    nested = []
+    for h in handlers:
+        nested.append(h)
+        if hasattr(h, 'handlers'):
+            nested.extend(getattr(h, 'handlers'))
+    return nested
+
+def add_nested_handler(handler, nested):
+    if hasattr(handler, 'handlers'):
+        getattr(handler, 'handlers').append(nested)
+    else:
+        handler.handlers = [nested]
 
 class Events(object):
-    def __init__(self, *dispatchers):
+    def __init__(self, *handlers):
         self.collector = Collector()
-        self.dispatchers = dispatchers
+        self.handlers = handlers
 
     def connection(self):
         conn = Connection()
@@ -245,8 +259,8 @@ class Events(object):
                 return
 
     def dispatch(self, event):
-        for d in self.dispatchers:
-            event.dispatch(d)
+        for h in self.handlers:
+            event.dispatch(h)
 
     @property
     def next_interval(self):
@@ -286,9 +300,14 @@ class ApplicationEvent(Event):
         return "%s(%s)" % (self.type.name,
                            ", ".join([str(o) for o in objects if o is not 
None]))
 
+class StartEvent(ApplicationEvent):
+    def __init__(self, reactor):
+        super(StartEvent, self).__init__("start")
+        self.reactor = reactor
+
 class ScheduledEvents(Events):
-    def __init__(self, *dispatchers):
-        super(ScheduledEvents, self).__init__(*dispatchers)
+    def __init__(self, *handlers):
+        super(ScheduledEvents, self).__init__(*handlers)
         self._events = []
 
     def schedule(self, deadline, event):
@@ -464,11 +483,14 @@ class ScopedHandler(Handler):
             return
         objects = [getattr(event, attr) for attr in 
self.scopes.get(event.clazz, [])]
         targets = [getattr(o, "context") for o in objects if hasattr(o, 
"context")]
-        handlers = [getattr(t, event.type.method) for t in targets if 
hasattr(t, event.type.method)]
+        handlers = [getattr(t, event.type.method) for t in 
nested_handlers(targets) if hasattr(t, event.type.method)]
         for h in handlers:
             h(event)
 
 class OutgoingMessageHandler(Handler):
+    def __init__(self, auto_settle=True, delegate=None):
+        self.auto_settle = auto_settle
+        self.delegate = delegate
 
     def on_link_flow(self, event):
         if event.link.is_sender and event.link.credit:
@@ -488,16 +510,32 @@ class OutgoingMessageHandler(Handler):
                 self.on_modified(event)
             if dlv.settled:
                 self.on_settled(event)
-            if self.auto_settle():
+            if self.auto_settle:
                 dlv.settle()
 
-    def on_credit(self, event): pass
-    def on_accepted(self, event): pass
-    def on_rejected(self, event): pass
-    def on_released(self, event): pass
-    def on_modified(self, event): pass
-    def on_settled(self, event): pass
-    def auto_settle(self): return True
+    def on_credit(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_credit', event)
+
+    def on_accepted(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_accepted', event)
+
+    def on_rejected(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_rejected', event)
+
+    def on_released(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_released', event)
+
+    def on_modified(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_modified', event)
+
+    def on_settled(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_settled', event)
 
 def recv_msg(delivery):
     msg = Message()
@@ -511,23 +549,7 @@ class Reject(ProtonException):
   """
   pass
 
-class IncomingMessageHandler(Handler):
-    def on_delivery(self, event):
-        dlv = event.delivery
-        if dlv.released or not dlv.link.is_receiver: return
-        if dlv.readable and not dlv.partial:
-            event.message = recv_msg(dlv)
-            try:
-                self.on_message(event)
-                if self.auto_accept():
-                    dlv.update(Delivery.ACCEPTED)
-                    dlv.settle()
-            except Reject:
-                dlv.update(Delivery.REJECTED)
-                dlv.settle()
-        elif dlv.updated and dlv.settled:
-            self.on_settled(event)
-
+class Acking(object):
     def accept(self, delivery):
         self.settle(delivery, Delivery.ACCEPTED)
 
@@ -545,66 +567,86 @@ class IncomingMessageHandler(Handler):
             delivery.update(state)
         delivery.settle()
 
-    def on_message(self, event): pass
-    def on_settled(self, event): pass
-    def auto_accept(self): return True
+class IncomingMessageHandler(Handler, Acking):
+    def __init__(self, auto_accept=True, delegate=None):
+        self.delegate = delegate
+        self.auto_accept = auto_accept
 
-class ClientEndpointHandler(Handler):
+    def on_delivery(self, event):
+        dlv = event.delivery
+        if dlv.released or not dlv.link.is_receiver: return
+        if dlv.readable and not dlv.partial:
+            event.message = recv_msg(dlv)
+            try:
+                self.on_message(event)
+                if self.auto_accept:
+                    dlv.update(Delivery.ACCEPTED)
+                    dlv.settle()
+            except Reject:
+                dlv.update(Delivery.REJECTED)
+                dlv.settle()
+        elif dlv.updated and dlv.settled:
+            self.on_settled(event)
+
+    def on_message(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_message', event)
+
+    def on_settled(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_settled', event)
+
+class EndpointStateHandler(Handler):
+    def __init__(self, peer_close_is_error=False, delegate=None):
+        self.delegate = delegate
+        self.peer_close_is_error = peer_close_is_error
 
     def is_local_open(self, endpoint):
         return endpoint.state & Endpoint.LOCAL_ACTIVE
 
+    def is_local_uninitialised(self, endpoint):
+        return endpoint.state & Endpoint.LOCAL_UNINIT
+
+    def is_local_closed(self, endpoint):
+        return endpoint.state & Endpoint.LOCAL_CLOSED
+
     def is_remote_open(self, endpoint):
         return endpoint.state & Endpoint.REMOTE_ACTIVE
 
     def is_remote_closed(self, endpoint):
         return endpoint.state & Endpoint.REMOTE_CLOSED
 
-    def was_closed_by_peer(self, endpoint, parent=None):
-        if parent:
-            return self.was_closed_by_peer(parent) and 
self.was_closed_by_peer(endpoint)
-        else:
-            return self.is_local_open(endpoint) and 
self.is_remote_closed(endpoint)
-
-    def treat_as_error(self, endpoint, parent=None):
-        return endpoint.remote_condition or self.was_closed_by_peer(endpoint, 
parent)
-
     def print_error(self, endpoint, endpoint_type):
         if endpoint.remote_condition:
             print endpoint.remote_condition.description
-        elif self.was_closed_by_peer(endpoint):
+        elif self.is_local_open(endpoint) and self.is_remote_closed(endpoint):
             print "%s closed by peer" % endpoint_type
 
     def on_link_remote_close(self, event):
-        if self.treat_as_error(event.link, event.connection):
+        if event.link.remote_condition:
             self.on_link_error(event)
-        else:
+        elif self.is_local_closed(event.link):
             self.on_link_closed(event)
+        else:
+            self.on_link_closing(event)
+        event.link.close()
 
     def on_session_remote_close(self, event):
-        if self.treat_as_error(event.session, event.connection):
+        if event.session.remote_condition:
             self.on_session_error(event)
-        else:
+        elif self.is_local_closed(event.session):
             self.on_session_closed(event)
+        else:
+            self.on_session_closing(event)
+        event.session.close()
 
     def on_connection_remote_close(self, event):
-        if self.treat_as_error(event.connection):
+        if event.connection.remote_condition:
             self.on_connection_error(event)
-        else:
+        elif self.is_local_closed(event.connection):
             self.on_connection_closed(event)
-
-    def on_connection_error(self, event):
-        self.print_error(event.connection, "connection")
-        event.connection.close()
-
-    def on_session_error(self, event):
-        self.print_error(event.session, "session")
-        event.session.close()
-        event.connection.close()
-
-    def on_link_error(self, event):
-        self.print_error(event.link, "link")
-        event.link.close()
+        else:
+            self.on_connection_closing(event)
         event.connection.close()
 
     def on_connection_local_open(self, event):
@@ -614,6 +656,9 @@ class ClientEndpointHandler(Handler):
     def on_connection_remote_open(self, event):
         if self.is_local_open(event.connection):
             self.on_connection_opened(event)
+        elif self.is_local_uninitialised(event.connection):
+            self.on_connection_opening(event)
+            event.connection.open()
 
     def on_session_local_open(self, event):
         if self.is_remote_open(event.session):
@@ -622,6 +667,9 @@ class ClientEndpointHandler(Handler):
     def on_session_remote_open(self, event):
         if self.is_local_open(event.session):
             self.on_session_opened(event)
+        elif self.is_local_uninitialised(event.session):
+            self.on_session_opening(event)
+            event.session.open()
 
     def on_link_local_open(self, event):
         if self.is_remote_open(event.link):
@@ -630,37 +678,95 @@ class ClientEndpointHandler(Handler):
     def on_link_remote_open(self, event):
         if self.is_local_open(event.link):
             self.on_link_opened(event)
+        elif self.is_local_uninitialised(event.link):
+            self.on_link_opening(event)
+            event.link.open()
 
     def on_connection_opened(self, event):
-        pass
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_opened', event)
 
     def on_session_opened(self, event):
-        pass
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_opened', event)
 
     def on_link_opened(self, event):
-        pass
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_opened', event)
+
+    def on_connection_opening(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_opening', event)
+
+    def on_session_opening(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_opening', event)
+
+    def on_link_opening(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_opening', event)
+
+    def on_connection_error(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_error', event)
+        else:
+            self.print_error(event.connection, "connection")
+
+    def on_session_error(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_error', event)
+        else:
+            self.print_error(event.session, "session")
+            event.connection.close()
+
+    def on_link_error(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_error', event)
+        else:
+            self.print_error(event.link, "link")
+            event.connection.close()
 
     def on_connection_closed(self, event):
-        pass
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_closed', event)
 
     def on_session_closed(self, event):
-        pass
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_closed', event)
 
     def on_link_closed(self, event):
-        pass
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_closed', event)
 
-class ClientHandler(ClientEndpointHandler, IncomingMessageHandler, 
OutgoingMessageHandler):
+    def on_connection_closing(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_connection_closing', event)
+        elif self.peer_close_is_error:
+            self.on_connection_error(event)
 
-    def __init__(self):
-        super(ClientHandler, self).__init__()
+    def on_session_closing(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_session_closing', event)
+        elif self.peer_close_is_error:
+            self.on_session_error(event)
 
-    def on_delivery(self, event):
-        IncomingMessageHandler.on_delivery(self, event)
-        OutgoingMessageHandler.on_delivery(self, event)
+    def on_link_closing(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_link_closing', event)
+        elif self.peer_close_is_error:
+            self.on_link_error(event)
 
-    def on_settled(self, event):
-        IncomingMessageHandler.on_settled(self, event)
-        OutgoingMessageHandler.on_settled(self, event)
+class MessagingHandler(Handler, Acking):
+    def __init__(self, prefetch=10, auto_accept=True, auto_settle=True, 
peer_close_is_error=False):
+        self.handlers = []
+        # FlowController if used needs to see event before
+        # IncomingMessageHandler, as the latter may involve the
+        # delivery being released
+        if prefetch:
+            self.handlers.append(FlowController(prefetch))
+        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
+        self.handlers.append(IncomingMessageHandler(auto_accept, self))
+        self.handlers.append(OutgoingMessageHandler(auto_settle, self))
 
 def delivery_tags():
     count = 1
@@ -682,18 +788,7 @@ def send_msg(sender, msg, tag=None, handler=None, 
transaction=None):
 def _send_msg(self, msg, tag=None, handler=None, transaction=None):
     return send_msg(self, msg, tag, handler, transaction)
 
-class TransactionHandler(OutgoingMessageHandler):
-    def on_settled(self, event):
-        if hasattr(event.delivery, "transaction"):
-            event.transaction = event.delivery.transaction
-            event.delivery.transaction.handle_outcome(event)
-
-    def on_transaction_declared(self, event): pass
-    def on_transaction_committed(self, event): pass
-    def on_transaction_aborted(self, event): pass
-    def on_transaction_declare_failed(self, event): pass
-    def on_transaction_commit_failed(self, event): pass
-
+class TransactionalAcking(object):
     def accept(self, delivery, transaction):
         self.settle(delivery, transaction, PN_ACCEPTED)
 
@@ -703,17 +798,48 @@ class TransactionHandler(OutgoingMessageHandler):
             delivery.update(0x34)
         delivery.settle()
 
-class TransactionalClientHandler(ClientEndpointHandler, TransactionHandler, 
IncomingMessageHandler):
-    def __init__(self):
-        super(TransactionalClientHandler, self).__init__()
-
-    def on_delivery(self, event):
-        IncomingMessageHandler.on_delivery(self, event)
-        TransactionHandler.on_delivery(self, event)
+class TransactionHandler(OutgoingMessageHandler, TransactionalAcking):
+    def __init__(self, auto_settle=True, delegate=None):
+        super(TransactionHandler, self).__init__(auto_settle, delegate)
 
     def on_settled(self, event):
-        IncomingMessageHandler.on_settled(self, event)
-        TransactionHandler.on_settled(self, event)
+        if hasattr(event.delivery, "transaction"):
+            event.transaction = event.delivery.transaction
+            event.delivery.transaction.handle_outcome(event)
+
+    def on_transaction_declared(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_declared', event)
+
+    def on_transaction_committed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_committed', event)
+
+    def on_transaction_aborted(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_aborted', event)
+
+    def on_transaction_declare_failed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_declare_failed', event)
+
+    def on_transaction_commit_failed(self, event):
+        if self.delegate:
+            dispatch(self.delegate, 'on_transaction_commit_failed', event)
+
+class TransactionalClientHandler(Handler, TransactionalAcking):
+    def __init__(self, prefetch=10, auto_accept=False, auto_settle=True, 
peer_close_is_error=False):
+        super(TransactionalClientHandler, self).__init__()
+        self.handlers = []
+        # FlowController if used needs to see event before
+        # IncomingMessageHandler, as the latter may involve the
+        # delivery being released
+        if prefetch:
+            self.handlers.append(FlowController(prefetch))
+        self.handlers.append(EndpointStateHandler(peer_close_is_error, self))
+        self.handlers.append(IncomingMessageHandler(auto_accept, self))
+        self.handlers.append(TransactionHandler(auto_settle, self))
+
 
 class Transaction(object):
     def __init__(self, txn_ctrl, handler):
@@ -1011,11 +1137,9 @@ class Urls(object):
 class EventLoop(object):
     def __init__(self, *handlers):
         self.connector = Connector()
-        if handlers:
-            l = handlers + (self.connector, ScopedHandler())
-        else:
-            l = [FlowController(10), self.connector, ScopedHandler()]
-        self.events = ScheduledEvents(*l)
+        h = [self.connector, ScopedHandler()]
+        h.extend(nested_handlers(handlers))
+        self.events = ScheduledEvents(*h)
         self.loop = SelectLoop(self.events)
         self.connector.attach_to(self)
         self.trigger = None
@@ -1056,6 +1180,7 @@ class EventLoop(object):
         self.loop.remove(selectable)
 
     def run(self):
+        self.events.dispatch(StartEvent(self))
         self.loop.run()
 
     def stop(self):

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/proton_tornado.py
----------------------------------------------------------------------
diff --git a/tutorial/proton_tornado.py b/tutorial/proton_tornado.py
index 470eafa..0275169 100644
--- a/tutorial/proton_tornado.py
+++ b/tutorial/proton_tornado.py
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-from proton_events import ApplicationEvent, EventLoop
+from proton_events import ApplicationEvent, EventLoop, StartEvent
 import tornado.ioloop
 
 class TornadoLoop(EventLoop):
@@ -41,6 +41,7 @@ class TornadoLoop(EventLoop):
         self.loop.remove_handler(conn)
 
     def run(self):
+        self.events.dispatch(StartEvent(self))
         self.loop.start()
 
     def stop(self):
@@ -50,8 +51,10 @@ class TornadoLoop(EventLoop):
         flags = 0
         if conn.reading():
             flags |= tornado.ioloop.IOLoop.READ
-        if conn.writing():
-            flags |= tornado.ioloop.IOLoop.WRITE
+        # FIXME: need way to update flags to avoid busy loop
+        #if conn.writing():
+        #    flags |= tornado.ioloop.IOLoop.WRITE
+        flags |= tornado.ioloop.IOLoop.WRITE
         return flags
 
     def _connection_ready(self, conn, events):
@@ -59,9 +62,9 @@ class TornadoLoop(EventLoop):
             conn.readable()
         if events & tornado.ioloop.IOLoop.WRITE:
             conn.writable()
-        if events & tornado.ioloop.IOLoop.ERROR or conn.closed():
-            conn.close()
+        if events & tornado.ioloop.IOLoop.ERROR:# or conn.closed():
             self.loop.remove_handler(conn)
+            conn.close()
             conn.removed()
         self.events.process()
         self.loop.update_handler(conn, self._get_event_flags(conn))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/server.py
----------------------------------------------------------------------
diff --git a/tutorial/server.py b/tutorial/server.py
index b567179..c426fc3 100755
--- a/tutorial/server.py
+++ b/tutorial/server.py
@@ -19,16 +19,24 @@
 #
 
 from proton import Message
-from proton_events import EventLoop, ClientHandler
+from proton_events import EventLoop, MessagingHandler
 
-class Server(ClientHandler):
-    def __init__(self, eventloop, host, address):
-        self.eventloop = eventloop
-        self.conn = eventloop.connect(host, handler=self)
-        self.receiver = self.conn.create_receiver(address, handler=self)
+class Server(MessagingHandler):
+    def __init__(self, host, address):
+        super(Server, self).__init__()
+        self.host = host
+        self.address = address
+
+    def on_start(self, event):
+        self.conn = event.reactor.connect(self.host)
+        self.receiver = self.conn.create_receiver(self.address)
         self.senders = {}
         self.relay = None
 
+    def on_connection_opened(self, event):
+        if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' 
in event.connection.remote_offered_capabilities:
+            self.relay = self.conn.create_sender(None)
+
     def on_message(self, event):
         sender = self.relay
         if not sender:
@@ -38,15 +46,8 @@ class Server(ClientHandler):
             self.senders[event.message.reply_to] = sender
         sender.send_msg(Message(address=event.message.reply_to, 
body=event.message.body.upper()))
 
-    def on_connection_opened(self, event):
-        if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' 
in event.connection.remote_offered_capabilities:
-            self.relay = self.conn.create_sender(None)
-
-    def run(self):
-        self.eventloop.run()
-
 try:
-    Server(EventLoop(), "localhost:5672", "examples").run()
+    EventLoop(Server("localhost:5672", "examples")).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/server_tx.py
----------------------------------------------------------------------
diff --git a/tutorial/server_tx.py b/tutorial/server_tx.py
index cb7c3a2..c366d08 100755
--- a/tutorial/server_tx.py
+++ b/tutorial/server_tx.py
@@ -19,14 +19,15 @@
 #
 
 from proton import Message
-from proton_events import EventLoop, ClientHandler, TransactionHandler
+from proton_events import EventLoop, MessagingHandler, TransactionHandler
 
 class TxRequest(TransactionHandler):
-    def __init__(self, response, sender, request_delivery, conn):
+    def __init__(self, response, sender, request_delivery, context):
+        super(TxRequest, self).__init__()
         self.response = response
         self.sender = sender
         self.request_delivery = request_delivery
-        self.conn = conn
+        self.context = context
 
     def on_transaction_declared(self, event):
         self.sender.send_msg(self.response, transaction=event.transaction)
@@ -40,39 +41,35 @@ class TxRequest(TransactionHandler):
         print "Request processing aborted"
 
 
-class TxServer(ClientHandler):
+class TxServer(MessagingHandler):
     def __init__(self, host, address):
-        self.eventloop = EventLoop()
-        self.conn = self.eventloop.connect(host, handler=self, reconnect=False)
-        self.receiver = self.conn.create_receiver(address, handler=self)
+        super(TxServer, self).__init__(auto_accept=False)
+        self.host = host
+        self.address = address
+
+    def on_start(self, event):
+        self.context = event.reactor.connect(self.host, reconnect=False)
+        self.receiver = self.context.create_receiver(self.address)
         self.senders = {}
         self.relay = None
 
-    def auto_accept(self): return False
-
-    def on_link_open(self, event):
-        self.conn.create_transaction()
-
     def on_message(self, event):
         sender = self.relay
         if not sender:
             sender = self.senders.get(event.message.reply_to)
         if not sender:
-            sender = self.conn.create_sender(event.message.reply_to)
+            sender = self.context.create_sender(event.message.reply_to)
             self.senders[event.message.reply_to] = sender
 
         response = Message(address=event.message.reply_to, 
body=event.message.body.upper())
-        self.conn.declare_transaction(handler=TxRequest(response, sender, 
event.delivery, self.conn))
+        self.context.declare_transaction(handler=TxRequest(response, sender, 
event.delivery, self.context))
 
     def on_connection_open(self, event):
         if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' 
in event.connection.remote_offered_capabilities:
-            self.relay = self.conn.create_sender(None)
-
-    def run(self):
-        self.eventloop.run()
+            self.relay = self.context.create_sender(None)
 
 try:
-    TxServer("localhost:5672", "examples").run()
+    EventLoop(TxServer("localhost:5672", "examples")).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/simple_recv.py
----------------------------------------------------------------------
diff --git a/tutorial/simple_recv.py b/tutorial/simple_recv.py
index f0524d8..7b58553 100755
--- a/tutorial/simple_recv.py
+++ b/tutorial/simple_recv.py
@@ -18,16 +18,23 @@
 # under the License.
 #
 
-import proton_events
+from proton_events import EventLoop, MessagingHandler
+
+class Recv(MessagingHandler):
+    def __init__(self, host, address):
+        super(Recv, self).__init__()
+        self.host = host
+        self.address = address
+
+    def on_start(self, event):
+        conn = event.reactor.connect(self.host)
+        conn.create_receiver(self.address)
 
-class Recv(proton_events.ClientHandler):
     def on_message(self, event):
         print event.message.body
 
 try:
-    conn = proton_events.connect("localhost:5672", handler=Recv())
-    conn.create_receiver("examples")
-    proton_events.run()
+    EventLoop(Recv("localhost:5672", "examples")).run()
 except KeyboardInterrupt: pass
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/simple_send.py
----------------------------------------------------------------------
diff --git a/tutorial/simple_send.py b/tutorial/simple_send.py
index f7ad67e..7c86718 100755
--- a/tutorial/simple_send.py
+++ b/tutorial/simple_send.py
@@ -19,18 +19,25 @@
 #
 
 from proton import Message
-import proton_events
+from proton_events import EventLoop, MessagingHandler
 
-class Send(proton_events.ClientHandler):
-    def __init__(self, messages):
+class Send(MessagingHandler):
+    def __init__(self, host, address, messages):
+        super(Send, self).__init__()
+        self.host = host
+        self.address = address
         self.sent = 0
         self.confirmed = 0
         self.total = messages
 
+    def on_start(self, event):
+        conn = event.reactor.connect(self.host)
+        conn.create_sender(self.address)
+
     def on_credit(self, event):
-        while event.link.credit and self.sent < self.total:
+        while event.sender.credit and self.sent < self.total:
             msg = Message(body={'sequence':(self.sent+1)})
-            event.link.send_msg(msg)
+            event.sender.send_msg(msg)
             self.sent += 1
 
     def on_accepted(self, event):
@@ -43,7 +50,5 @@ class Send(proton_events.ClientHandler):
         self.sent = self.confirmed
 
 try:
-    conn = proton_events.connect("localhost:5672", handler=Send(10000))
-    conn.create_sender("examples")
-    proton_events.run()
+    EventLoop(Send("localhost:5672", "examples", 10000)).run()
 except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/tx_recv.py
----------------------------------------------------------------------
diff --git a/tutorial/tx_recv.py b/tutorial/tx_recv.py
index 9a34563..7d87c94 100755
--- a/tutorial/tx_recv.py
+++ b/tutorial/tx_recv.py
@@ -22,6 +22,7 @@ import proton_events
 
 class TxRecv(proton_events.TransactionalClientHandler):
     def __init__(self, batch_size):
+        super(TxRecv, self).__init__(prefetch=0)
         self.current_batch = 0
         self.batch_size = batch_size
         self.event_loop = proton_events.EventLoop(self)
@@ -42,8 +43,6 @@ class TxRecv(proton_events.TransactionalClientHandler):
         self.receiver.flow(self.batch_size)
         self.transaction = event.transaction
 
-    def auto_accept(self): return False
-
     def on_transaction_committed(self, event):
         self.current_batch = 0
         self.conn.declare_transaction(handler=self)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/tx_send.py
----------------------------------------------------------------------
diff --git a/tutorial/tx_send.py b/tutorial/tx_send.py
index eb719cb..7fc3951 100755
--- a/tutorial/tx_send.py
+++ b/tutorial/tx_send.py
@@ -23,8 +23,10 @@ import proton_events
 
 class TxSend(proton_events.TransactionalClientHandler):
     def __init__(self, messages, batch_size):
+        super(TxSend, self).__init__()
         self.current_batch = 0
         self.committed = 0
+        self.confirmed = 0
         self.total = messages
         self.batch_size = batch_size
         self.conn = proton_events.connect("localhost:5672", handler=self)
@@ -68,6 +70,5 @@ class TxSend(proton_events.TransactionalClientHandler):
         proton_events.run()
 
 try:
-    #TxSend(10000, 10).run()
-    TxSend(9, 3).run()
+    TxSend(10000, 10).run()
 except KeyboardInterrupt: pass

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6aa4b4c8/tutorial/tx_send_sync.py
----------------------------------------------------------------------
diff --git a/tutorial/tx_send_sync.py b/tutorial/tx_send_sync.py
index 7064189..66679f9 100755
--- a/tutorial/tx_send_sync.py
+++ b/tutorial/tx_send_sync.py
@@ -23,6 +23,7 @@ import proton_events
 
 class TxSend(proton_events.TransactionalClientHandler):
     def __init__(self, messages, batch_size):
+        super(TxSend, self).__init__()
         self.current_batch = 0
         self.confirmed = 0
         self.committed = 0


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to