Author: gsim Date: Wed Oct 22 19:12:35 2014 New Revision: 1633685 URL: http://svn.apache.org/r1633685 Log: Add support for transations along with example
Added: qpid/proton/branches/examples/tutorial/server_tx.py (with props) Modified: qpid/proton/branches/examples/tutorial/proton_events.py Modified: qpid/proton/branches/examples/tutorial/proton_events.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1633685&r1=1633684&r2=1633685&view=diff ============================================================================== --- qpid/proton/branches/examples/tutorial/proton_events.py (original) +++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Oct 22 19:12:35 2014 @@ -17,7 +17,8 @@ # under the License. # import heapq, os, Queue, re, socket, time, types -from proton import Collector, Connection, Delivery, Endpoint, Event, Timeout +from proton import generate_uuid, PN_ACCEPTED, SASL, symbol +from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout from proton import Message, Handler, ProtonException, Transport, TransportException, ConnectionException from select import select @@ -35,18 +36,23 @@ class AmqpConnection(object): self.read_done = False self._closed = False - def accept(self): - #TODO: use SASL if requested by peer - #sasl = self.transport.sasl() - #sasl.mechanisms("ANONYMOUS") - #sasl.server() - #sasl.done(SASL.OK) + def accept(self, force_sasl=True): + if force_sasl: + sasl = self.transport.sasl() + sasl.mechanisms("ANONYMOUS") + sasl.server() + sasl.done(SASL.OK) + #TODO: use SASL anyway if requested by peer return self - def connect(self, host, port=None, username=None, password=None): + def connect(self, host, port=None, username=None, password=None, force_sasl=True): if username and password: sasl = self.transport.sasl() sasl.plain(username, password) + elif force_sasl: + sasl = self.transport.sasl() + sasl.mechanisms('ANONYMOUS') + sasl.client() try: self.socket.connect_ex((host, port or 5672)) except socket.gaierror, e: @@ -175,7 +181,6 @@ class Acceptor: def readable(self): sock, addr = self.socket.accept() if sock: - #self.selectables.append(AmqpConnection(self.events.connection(), sock, self.events).accept()) self.loop.add(AmqpConnection(self.events.connection(), sock, self.events).accept()) def removed(self): pass @@ -642,16 +647,90 @@ def delivery_tags(): yield str(count) count += 1 -def send_msg(sender, msg, tag=None, handler=None): +def send_msg(sender, msg, tag=None, handler=None, transaction=None): dlv = sender.delivery(tag or next(sender.tags)) + if transaction: + dlv.local.data = Described(symbol(u'amqp:transactional-state:list'), [transaction.id]) + dlv.update(0x34) if handler: dlv.context = handler sender.send(msg.encode()) sender.advance() return dlv -def _send_msg(self, msg, tag=None, handler=None): - return send_msg(self, msg, tag, handler) +def _send_msg(self, msg, tag=None, handler=None, transaction=None): + return send_msg(self, msg, tag, handler, transaction) + +class TxHandler(OutgoingMessageHandler): + def on_settled(self, event): + if hasattr(event.delivery, "transaction"): + event.transaction = event.delivery.transaction + event.delivery.transaction.handle_outcome(event) + + def on_transaction_declared(self, event): pass + def on_transaction_committed(self, event): pass + def on_transaction_aborted(self, event): pass + def on_transaction_declare_failed(self, event): pass + def on_transaction_commit_failed(self, event): pass + + def accept(self, delivery, transaction): + self.settle(delivery, transaction, PN_ACCEPTED) + + def settle(self, delivery, transaction, state=None): + if state: + delivery.local.data = Described(symbol(u'amqp:transactional-state:list'), [transaction.id, state]) + delivery.update(0x34) + delivery.settle() + + +class Transaction(object): + def __init__(self, txn_ctrl, handler): + self.txn_ctrl = txn_ctrl + self.handler = handler + self.id = None + self._declare = None + self._discharge = None + self.failed = False + self.declare() + + def commit(self): + self.discharge(False) + + def abort(self): + self.discharge(True) + + def declare(self): + self._declare = self._send_ctrl(symbol(u'amqp:declare:list'), [None]) + + def discharge(self, failed): + self.failed = failed + self._discharge = self._send_ctrl(symbol(u'amqp:discharge:list'), [self.id, failed]) + + def _send_ctrl(self, descriptor, value): + delivery = self.txn_ctrl.send_msg(Message(body=Described(descriptor, value)), handler=self.handler) + delivery.transaction = self + return delivery + + def handle_outcome(self, event): + if event.delivery == self._declare: + if event.delivery.remote.data: + self.id = event.delivery.remote.data[0] + self.handler.on_transaction_declared(event) + elif event.delivery.remote_state == Delivery.REJECTED: + self.handler.on_transaction_declare_failed(event) + else: + print "Unexpected outcome for declare: %s" % event.delivery.remote_state + self.handler.on_transaction_declare_failed(event) + elif event.delivery == self._discharge: + if event.delivery.remote_state == Delivery.REJECTED: + if not self.failed: + self.handler.on_transaction_commit_failed(event) + else: + if self.failed: + self.handler.on_transaction_aborted(event) + else: + self.handler.on_transaction_committed(event) + class MessagingContext(object): def __init__(self, conn, handler=None, ssn=None): @@ -660,6 +739,7 @@ class MessagingContext(object): self.conn.context = handler self.conn._mc = self self.ssn = ssn + self.txn_ctrl = None def _get_handler(self): return self.conn.context @@ -671,9 +751,11 @@ class MessagingContext(object): def create_sender(self, target, source=None, name=None, handler=None, tags=None): snd = self._get_ssn().sender(name or self._get_id(target, source)) + snd.snd_settle_mode = Link.SND_SETTLED if source: snd.source.address = source - snd.target.address = target + if target: + snd.target.address = target if handler: snd.context = handler snd.tags = tags or delivery_tags() @@ -683,7 +765,8 @@ class MessagingContext(object): def create_receiver(self, source, target=None, name=None, dynamic=False, handler=None): rcv = self._get_ssn().receiver(name or self._get_id(source, target)) - rcv.source.address = source + if source: + rcv.source.address = source if dynamic: rcv.source.dynamic = True if target: @@ -696,6 +779,13 @@ class MessagingContext(object): def create_session(self): return MessageContext(conn=None, ssn=self._new_ssn()) + def declare_transaction(self, handler=None): + if not self.txn_ctrl: + self.txn_ctrl = self.create_sender(None, name="txn-ctrl") + self.txn_ctrl.target.type = Terminus.COORDINATOR + self.txn_ctrl.target.capabilities.put_object(symbol(u'amqp:local-transactions')) + return Transaction(self.txn_ctrl, handler) + def close(self): if self.ssn: self.ssn.close() @@ -703,9 +793,10 @@ class MessagingContext(object): self.conn.close() def _get_id(self, remote, local): - if local: "%s-%s" % (remote, local) - elif remote: return remote - else: return "temp" + if local and remote: "%s-%s-%s" % (self.conn.container, remote, local) + elif local: return "%s-%s" % (self.conn.container, local) + elif remote: return "%s-%s" % (self.conn.container, remote) + else: return "%s-%s" % (self.conn.container, str(generate_uuid())) def _get_ssn(self): if not self.ssn: @@ -853,9 +944,11 @@ class EventLoop(object): self.loop = SelectLoop(self.events) self.connector.attach_to(self) self.trigger = None + self.container_id = str(generate_uuid()) def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None): context = MessagingContext(self.events.connection(), handler=handler) + context.conn.container = self.container_id or str(generate_uuid()) if url: context.conn.address = Url(url) elif urls: context.conn.address = Urls(urls) elif address: context.conn.address = address Added: qpid/proton/branches/examples/tutorial/server_tx.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/server_tx.py?rev=1633685&view=auto ============================================================================== --- qpid/proton/branches/examples/tutorial/server_tx.py (added) +++ qpid/proton/branches/examples/tutorial/server_tx.py Wed Oct 22 19:12:35 2014 @@ -0,0 +1,79 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from proton import Message +from proton_events import EventLoop, ClientHandler, TxHandler + +class TxRequest(TxHandler): + def __init__(self, response, sender, request_delivery, conn): + self.response = response + self.sender = sender + self.request_delivery = request_delivery + self.conn = conn + + def on_transaction_declared(self, event): + self.sender.send_msg(self.response, transaction=event.transaction) + self.accept(self.request_delivery, transaction=event.transaction) + event.transaction.commit() + + def on_transaction_committed(self, event): + print "Request processed successfully" + + def on_transaction_aborted(self, event): + print "Request processing aborted" + + +class TxServer(ClientHandler): + def __init__(self, host, address): + self.eventloop = EventLoop() + self.conn = self.eventloop.connect(host, handler=self, reconnect=False) + self.receiver = self.conn.create_receiver(address, handler=self) + self.senders = {} + self.relay = None + + def auto_accept(self): return False + + def on_link_open(self, event): + self.conn.create_transaction() + + def on_message(self, event): + sender = self.relay + if not sender: + sender = self.senders.get(event.message.reply_to) + if not sender: + sender = self.conn.create_sender(event.message.reply_to) + self.senders[event.message.reply_to] = sender + + response = Message(address=event.message.reply_to, body=event.message.body.upper()) + self.conn.declare_transaction(handler=TxRequest(response, sender, event.delivery, self.conn)) + + def on_connection_open(self, event): + if event.connection.remote_offered_capabilities and 'ANONYMOUS-RELAY' in event.connection.remote_offered_capabilities: + self.relay = self.conn.create_sender(None) + + def run(self): + self.eventloop.run() + +try: + TxServer("localhost:5672", "examples").run() +except KeyboardInterrupt: pass + + + Propchange: qpid/proton/branches/examples/tutorial/server_tx.py ------------------------------------------------------------------------------ svn:executable = * --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org