Author: gsim
Date: Wed Oct 22 19:12:35 2014
New Revision: 1633685

URL: http://svn.apache.org/r1633685
Log:
Add support for transations along with example

Added:
    qpid/proton/branches/examples/tutorial/server_tx.py   (with props)
Modified:
    qpid/proton/branches/examples/tutorial/proton_events.py

Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1633685&r1=1633684&r2=1633685&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Oct 22 19:12:35 
2014
@@ -17,7 +17,8 @@
 # under the License.
 #
 import heapq, os, Queue, re, socket, time, types
-from proton import Collector, Connection, Delivery, Endpoint, Event, Timeout
+from proton import generate_uuid, PN_ACCEPTED, SASL, symbol
+from proton import Collector, Connection, Delivery, Described, Endpoint, 
Event, Link, Terminus, Timeout
 from proton import Message, Handler, ProtonException, Transport, 
TransportException, ConnectionException
 from select import select
 
@@ -35,18 +36,23 @@ class AmqpConnection(object):
         self.read_done = False
         self._closed = False
 
-    def accept(self):
-        #TODO: use SASL if requested by peer
-        #sasl = self.transport.sasl()
-        #sasl.mechanisms("ANONYMOUS")
-        #sasl.server()
-        #sasl.done(SASL.OK)
+    def accept(self, force_sasl=True):
+        if force_sasl:
+            sasl = self.transport.sasl()
+            sasl.mechanisms("ANONYMOUS")
+            sasl.server()
+            sasl.done(SASL.OK)
+        #TODO: use SASL anyway if requested by peer
         return self
 
-    def connect(self, host, port=None, username=None, password=None):
+    def connect(self, host, port=None, username=None, password=None, 
force_sasl=True):
         if username and password:
             sasl = self.transport.sasl()
             sasl.plain(username, password)
+        elif force_sasl:
+            sasl = self.transport.sasl()
+            sasl.mechanisms('ANONYMOUS')
+            sasl.client()
         try:
             self.socket.connect_ex((host, port or 5672))
         except socket.gaierror, e:
@@ -175,7 +181,6 @@ class Acceptor:
     def readable(self):
         sock, addr = self.socket.accept()
         if sock:
-            #self.selectables.append(AmqpConnection(self.events.connection(), 
sock, self.events).accept())
             self.loop.add(AmqpConnection(self.events.connection(), sock, 
self.events).accept())
 
     def removed(self): pass
@@ -642,16 +647,90 @@ def delivery_tags():
         yield str(count)
         count += 1
 
-def send_msg(sender, msg, tag=None, handler=None):
+def send_msg(sender, msg, tag=None, handler=None, transaction=None):
     dlv = sender.delivery(tag or next(sender.tags))
+    if transaction:
+        dlv.local.data = Described(symbol(u'amqp:transactional-state:list'), 
[transaction.id])
+        dlv.update(0x34)
     if handler:
         dlv.context = handler
     sender.send(msg.encode())
     sender.advance()
     return dlv
 
-def _send_msg(self, msg, tag=None, handler=None):
-    return send_msg(self, msg, tag, handler)
+def _send_msg(self, msg, tag=None, handler=None, transaction=None):
+    return send_msg(self, msg, tag, handler, transaction)
+
+class TxHandler(OutgoingMessageHandler):
+    def on_settled(self, event):
+        if hasattr(event.delivery, "transaction"):
+            event.transaction = event.delivery.transaction
+            event.delivery.transaction.handle_outcome(event)
+
+    def on_transaction_declared(self, event): pass
+    def on_transaction_committed(self, event): pass
+    def on_transaction_aborted(self, event): pass
+    def on_transaction_declare_failed(self, event): pass
+    def on_transaction_commit_failed(self, event): pass
+
+    def accept(self, delivery, transaction):
+        self.settle(delivery, transaction, PN_ACCEPTED)
+
+    def settle(self, delivery, transaction, state=None):
+        if state:
+            delivery.local.data = 
Described(symbol(u'amqp:transactional-state:list'), [transaction.id, state])
+            delivery.update(0x34)
+        delivery.settle()
+
+
+class Transaction(object):
+    def __init__(self, txn_ctrl, handler):
+        self.txn_ctrl = txn_ctrl
+        self.handler = handler
+        self.id = None
+        self._declare = None
+        self._discharge = None
+        self.failed = False
+        self.declare()
+
+    def commit(self):
+        self.discharge(False)
+
+    def abort(self):
+        self.discharge(True)
+
+    def declare(self):
+        self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None])
+
+    def discharge(self, failed):
+        self.failed = failed
+        self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), 
[self.id, failed])
+
+    def _send_ctrl(self, descriptor, value):
+        delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, 
value)), handler=self.handler)
+        delivery.transaction = self
+        return delivery
+
+    def handle_outcome(self, event):
+        if event.delivery == self._declare:
+            if event.delivery.remote.data:
+                self.id = event.delivery.remote.data[0]
+                self.handler.on_transaction_declared(event)
+            elif event.delivery.remote_state == Delivery.REJECTED:
+                self.handler.on_transaction_declare_failed(event)
+            else:
+                print "Unexpected outcome for declare: %s" % 
event.delivery.remote_state
+                self.handler.on_transaction_declare_failed(event)
+        elif event.delivery == self._discharge:
+            if event.delivery.remote_state == Delivery.REJECTED:
+                if not self.failed:
+                    self.handler.on_transaction_commit_failed(event)
+            else:
+                if self.failed:
+                    self.handler.on_transaction_aborted(event)
+                else:
+                    self.handler.on_transaction_committed(event)
+
 
 class MessagingContext(object):
     def __init__(self, conn, handler=None, ssn=None):
@@ -660,6 +739,7 @@ class MessagingContext(object):
             self.conn.context = handler
         self.conn._mc = self
         self.ssn = ssn
+        self.txn_ctrl = None
 
     def _get_handler(self):
         return self.conn.context
@@ -671,9 +751,11 @@ class MessagingContext(object):
 
     def create_sender(self, target, source=None, name=None, handler=None, 
tags=None):
         snd = self._get_ssn().sender(name or self._get_id(target, source))
+        snd.snd_settle_mode = Link.SND_SETTLED
         if source:
             snd.source.address = source
-        snd.target.address = target
+        if target:
+            snd.target.address = target
         if handler:
             snd.context = handler
         snd.tags = tags or delivery_tags()
@@ -683,7 +765,8 @@ class MessagingContext(object):
 
     def create_receiver(self, source, target=None, name=None, dynamic=False, 
handler=None):
         rcv = self._get_ssn().receiver(name or self._get_id(source, target))
-        rcv.source.address = source
+        if source:
+            rcv.source.address = source
         if dynamic:
             rcv.source.dynamic = True
         if target:
@@ -696,6 +779,13 @@ class MessagingContext(object):
     def create_session(self):
         return MessageContext(conn=None, ssn=self._new_ssn())
 
+    def declare_transaction(self, handler=None):
+        if not self.txn_ctrl:
+            self.txn_ctrl = self.create_sender(None, name="txn-ctrl")
+            self.txn_ctrl.target.type = Terminus.COORDINATOR
+            
self.txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions'))
+        return Transaction(self.txn_ctrl, handler)
+
     def close(self):
         if self.ssn:
             self.ssn.close()
@@ -703,9 +793,10 @@ class MessagingContext(object):
             self.conn.close()
 
     def _get_id(self, remote, local):
-        if local: "%s-%s" % (remote, local)
-        elif remote: return remote
-        else: return "temp"
+        if local and remote: "%s-%s-%s" % (self.conn.container, remote, local)
+        elif local: return "%s-%s" % (self.conn.container, local)
+        elif remote: return "%s-%s" % (self.conn.container, remote)
+        else: return "%s-%s" % (self.conn.container, str(generate_uuid()))
 
     def _get_ssn(self):
         if not self.ssn:
@@ -853,9 +944,11 @@ class EventLoop(object):
         self.loop = SelectLoop(self.events)
         self.connector.attach_to(self)
         self.trigger = None
+        self.container_id = str(generate_uuid())
 
     def connect(self, url=None, urls=None, address=None, handler=None, 
reconnect=None):
         context = MessagingContext(self.events.connection(), handler=handler)
+        context.conn.container = self.container_id or str(generate_uuid())
         if url: context.conn.address = Url(url)
         elif urls: context.conn.address = Urls(urls)
         elif address: context.conn.address = address

Added: qpid/proton/branches/examples/tutorial/server_tx.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/server_tx.py?rev=1633685&view=auto
==============================================================================
--- qpid/proton/branches/examples/tutorial/server_tx.py (added)
+++ qpid/proton/branches/examples/tutorial/server_tx.py Wed Oct 22 19:12:35 2014
@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from proton import Message
+from proton_events import EventLoop, ClientHandler, TxHandler
+
+class TxRequest(TxHandler):
+    def __init__(self, response, sender, request_delivery, conn):
+        self.response = response
+        self.sender = sender
+        self.request_delivery = request_delivery
+        self.conn = conn
+
+    def on_transaction_declared(self, event):
+        self.sender.send_msg(self.response, transaction=event.transaction)
+        self.accept(self.request_delivery, transaction=event.transaction)
+        event.transaction.commit()
+
+    def on_transaction_committed(self, event):
+        print "Request processed successfully"
+
+    def on_transaction_aborted(self, event):
+        print "Request processing aborted"
+
+
+class TxServer(ClientHandler):
+    def __init__(self, host, address):
+        self.eventloop = EventLoop()
+        self.conn = self.eventloop.connect(host, handler=self, reconnect=False)
+        self.receiver = self.conn.create_receiver(address, handler=self)
+        self.senders = {}
+        self.relay = None
+
+    def auto_accept(self): return False
+
+    def on_link_open(self, event):
+        self.conn.create_transaction()
+
+    def on_message(self, event):
+        sender = self.relay
+        if not sender:
+            sender = self.senders.get(event.message.reply_to)
+        if not sender:
+            sender = self.conn.create_sender(event.message.reply_to)
+            self.senders[event.message.reply_to] = sender
+
+        response = Message(address=event.message.reply_to, 
body=event.message.body.upper())
+        self.conn.declare_transaction(handler=TxRequest(response, sender, 
event.delivery, self.conn))
+
+    def on_connection_open(self, event):
+        if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' 
in event.connection.remote_offered_capabilities:
+            self.relay = self.conn.create_sender(None)
+
+    def run(self):
+        self.eventloop.run()
+
+try:
+    TxServer("localhost:5672", "examples").run()
+except KeyboardInterrupt: pass
+
+
+

Propchange: qpid/proton/branches/examples/tutorial/server_tx.py
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to