This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 96e5b8d03 PROTON-2890: [Python examples] Broker work for simple 
transaction support
96e5b8d03 is described below

commit 96e5b8d039a000dcc460a296c2e8892a0d8b91b3
Author: Andrew Stitcher <astitc...@apache.org>
AuthorDate: Wed May 22 18:02:36 2024 -0400

    PROTON-2890: [Python examples] Broker work for simple transaction support
    
    * Modified the Python example broker so that it understands transaction
      requests.
    
      Implemented a simple minded transaction implementation that delays
      queueing transactioned messages until commit, and delays processing
      transactioned delivery updates also until commit.
    
      In the case of abort queued messages are simply released and settled,
      and delivery updates turn to release which requeues the messages at
      the back of the queue (this is perhaps not a very good real world
      plan!)
    
    * Made delayed acknowledgement etc. work by implemented handling for
      message release and modification. There is now a configurable
      redelivery limit that can be set on the broker command line.
    
    * There is extensive optional output about the operations of the broker
      turned on by a command line option.
---
 python/examples/broker.py | 315 ++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 266 insertions(+), 49 deletions(-)

diff --git a/python/examples/broker.py b/python/examples/broker.py
index 753b7af3f..b350654a3 100755
--- a/python/examples/broker.py
+++ b/python/examples/broker.py
@@ -18,37 +18,43 @@
 # under the License.
 #
 
-import collections
+from abc import ABC, abstractmethod
+from collections import deque, namedtuple
+from dataclasses import dataclass
 import optparse
 import uuid
 
-from proton import Endpoint
+from typing import Optional, Union
+
+from proton import (Condition, Delivery, Described, Disposition, 
DispositionType,
+                    Endpoint, Link, Sender, Message, Terminus, 
TransactionalDisposition)
 from proton.handlers import MessagingHandler
 from proton.reactor import Container
 
 
-class Queue(object):
-    def __init__(self, dynamic=False):
-        self.dynamic = dynamic
-        self.queue = collections.deque()
-        self.consumers = []
+class Queue:
+    def __init__(self, broker: 'Broker', name: str, dynamic: bool = False):
+        self.broker = broker
+        self.name: str = name
+        self.dynamic: bool = dynamic
+        self.queue: deque[Message] = deque()
+        self.consumers: list[Sender] = []
 
-    def subscribe(self, consumer):
+    def subscribe(self, consumer: Sender):
         self.consumers.append(consumer)
 
-    def unsubscribe(self, consumer):
-        """
-        :return: True if the queue is to be deleted
-        """
+    def unsubscribe(self, consumer: Sender):
         if consumer in self.consumers:
             self.consumers.remove(consumer)
+
+    def removable(self):
         return len(self.consumers) == 0 and (self.dynamic or len(self.queue) 
== 0)
 
-    def publish(self, message):
+    def publish(self, message: Message):
         self.queue.append(message)
         self.dispatch()
 
-    def dispatch(self, consumer=None):
+    def dispatch(self, consumer: Optional[Sender] = None):
         if consumer:
             c = [consumer]
         else:
@@ -56,88 +62,299 @@ class Queue(object):
         while self._deliver_to(c):
             pass
 
-    def _deliver_to(self, consumers):
+    def _deliver_to(self, consumers: list[Sender]):
         try:
             result = False
             for c in consumers:
                 if c.credit:
-                    c.send(self.queue.popleft())
+                    message = self.queue.popleft()
+                    self.broker.deliver(c, message)
                     result = True
             return result
         except IndexError:  # no more messages
             return False
 
 
+UnsettledDelivery = namedtuple('UnsettledDelivery', ['address', 'message'])
+
+
+class TransactionAction(ABC):
+    @abstractmethod
+    def commit(self, broker): ...
+
+    @abstractmethod
+    def rollback(self, broker): ...
+
+
+@dataclass
+class QueueMessage(TransactionAction):
+    delivery: Delivery
+    message: Message
+    address: str
+
+    def commit(self, broker):
+        broker.publish(self.message, self.address)
+        self.delivery.update(Disposition.ACCEPTED)
+        self.delivery.settle()
+
+    def rollback(self, broker):
+        self.delivery.update(Disposition.RELEASED)
+        self.delivery.settle()
+
+
+@dataclass
+class DeliveryUpdate(TransactionAction):
+    delivery: Delivery
+    outcome: DispositionType
+
+    def commit(self, broker):
+        broker.delivery_update(self.delivery, self.outcome)
+
+    def rollback(self, broker):
+        broker.delivery_update(self.delivery, Disposition.RELEASED)
+
+
 class Broker(MessagingHandler):
-    def __init__(self, url):
-        super(Broker, self).__init__()
+    def __init__(self, url, verbose, redelivery_limit):
+        super().__init__(auto_accept=False)
+        self.verbose = verbose
         self.url = url
-        self.queues = {}
+        self.redelivery_limit = redelivery_limit
+        self.queues: dict[str, Queue] = {}
+        self.unsettled_deliveries: dict[Delivery, UnsettledDelivery] = {}
+        self.txns: dict[bytes, list[TransactionAction]] = {}
+        self.acceptor = None
 
-    def on_start(self, event):
-        self.acceptor = event.container.listen(self.url)
+    def _verbose_print(self, message):
+        if self.verbose:
+            print(message)
 
-    def _queue(self, address):
+    def _queue(self, address, dynamic=False):
         if address not in self.queues:
-            self.queues[address] = Queue()
+            self.queues[address] = Queue(self, address, dynamic)
+            self._verbose_print(f"{address=}: Created")
         return self.queues[address]
 
+    def on_start(self, event):
+        self.acceptor = event.container.listen(self.url)
+
     def on_connection_opening(self, event):
         event.connection.offered_capabilities = 'ANONYMOUS-RELAY'
 
     def on_link_opening(self, event):
-        if event.link.is_sender:
-            if event.link.remote_source.dynamic:
-                address = str(uuid.uuid4())
-                event.link.source.address = address
-                q = Queue(True)
-                self.queues[address] = q
-                q.subscribe(event.link)
-            elif event.link.remote_source.address:
-                event.link.source.address = event.link.remote_source.address
-                self._queue(event.link.source.address).subscribe(event.link)
-        elif event.link.remote_target.address:
-            event.link.target.address = event.link.remote_target.address
+        link = event.link
+        if link.is_sender:
+            dynamic = link.remote_source.dynamic
+            if dynamic or link.remote_source.address:
+                address = str(uuid.uuid4()) if dynamic else 
link.remote_source.address
+                link.source.address = address
+                self._queue(address, dynamic).subscribe(link)
+                self._verbose_print(f"{link=}: Subscribed: {address=}")
+        elif link.remote_target.type == Terminus.COORDINATOR:
+            # Set up transaction coordinator
+            # Should check for compatible capabilities
+            # requested = link.remote_target.capabilities.get_object()
+            link.target.type = Terminus.COORDINATOR
+            link.target.copy(link.remote_target)
+        elif link.remote_target.address:
+            link.target.address = link.remote_target.address
 
     def _unsubscribe(self, link):
-        if link.source.address in self.queues and 
self.queues[link.source.address].unsubscribe(link):
-            del self.queues[link.source.address]
+        address = link.source.address
+        if address in self.queues:
+            q = self.queues[address]
+            q.unsubscribe(link)
+            self._verbose_print(f"{link=}: Unsubscribed: {address=}")
+            if q.removable():
+                del q
+                self._verbose_print(f"{address=}: Removed")
+
+    def _modify_delivery(self, delivery: Delivery, message: Message, address: 
str):
+        disposition = delivery.remote
+        # If not deliverable don't requeue
+        if disposition.undeliverable:
+            self._verbose_print(f"{delivery.tag=}: Modified: Undeliverable: 
{message.id=}")
+            # Don't requeue the message
+            return
+        # Check if we need to update the delivery count
+        if disposition.failed:
+            if message.delivery_count >= self.redelivery_limit:
+                self._verbose_print(f"{delivery.tag=}: Modified: Redelivery 
limit exceeded: {message.id=}")
+                # Don't requeue the message
+                return
+            # Update the delivery count
+            message.delivery_count += 1
+        # Requeue the message from the delivery
+        self._verbose_print(f"{delivery.tag=}: Modified: {message.id=} 
Requeued: {address=}")
+        self._queue(address).publish(message)
+
+    def delivery_update(self, delivery: Delivery, outcome: Union[int, 
DispositionType]):
+        unsettled_delivery = self.unsettled_deliveries[delivery]
+        message = unsettled_delivery.message
+        address = unsettled_delivery.address
+        if outcome == Disposition.ACCEPTED:
+            self._verbose_print(f"{delivery.tag=}: Accepted: {message.id=}")
+            # Delivery was accepted - nothing further to do
+        elif outcome == Disposition.REJECTED:
+            self._verbose_print(f"{delivery.tag=}: Rejected: {message.id=}")
+            # Delivery was rejected - nothing further to do
+        elif outcome == Disposition.RELEASED:
+            self._verbose_print(f"{delivery.tag=}: Released: {message.id=} 
Requeued: {address=}")
+            # Requeue the message from the delivery
+            self._queue(address).publish(message)
+        elif outcome == Disposition.MODIFIED:
+            self._modify_delivery(delivery, message, address)
+        delivery.settle()
+        del unsettled_delivery
+
+    def _declare_txn(self):
+        tid = bytes(uuid.uuid4().bytes)
+        self.txns[tid] = []
+        return tid
+
+    def _discharge_txn(self, tid, failed):
+        if not failed:
+            # Commit
+            self._verbose_print(f"{tid=}: Commit")
+            for action in self.txns[tid]:
+                action.commit(self)
+        else:
+            # Rollback
+            self._verbose_print(f"{tid=}: Rollback")
+            for action in self.txns[tid]:
+                action.rollback(self)
+        del self.txns[tid]
+
+    def _coordinator_message(self, msg, delivery):
+        body = msg.body
+        if isinstance(body, Described):
+            d = body.descriptor
+            if d == "amqp:declare:list":
+                # Allocate transaction id
+                tid = self._declare_txn()
+                self._verbose_print(f"{tid=}: Declare")
+                delivery.local = TransactionalDisposition(tid)
+            elif d == "amqp:discharge:list":
+                # Always accept commit/abort!
+                value = body.value
+                tid = bytes(value[0])
+                failed = bool(value[1])
+                if tid in self.txns:
+                    self._discharge_txn(tid, failed)
+                    delivery.update(Disposition.ACCEPTED)
+                else:
+                    self._verbose_print(f"{tid=}: Discharge unknown txn-id: 
{failed=}")
+                    delivery.local.condition = 
Condition('amqp:transaction:unknown-id')
+                    delivery.update(Disposition.REJECTED)
+        delivery.settle()
 
     def on_link_closing(self, event):
         if event.link.is_sender:
             self._unsubscribe(event.link)
 
-    def on_connection_closing(self, event):
-        self.remove_stale_consumers(event.connection)
-
-    def on_disconnected(self, event):
-        self.remove_stale_consumers(event.connection)
-
-    def remove_stale_consumers(self, connection):
+    def _remove_stale_consumers(self, connection):
         link = connection.link_head(Endpoint.REMOTE_ACTIVE)
         while link:
             if link.is_sender:
                 self._unsubscribe(link)
             link = link.next(Endpoint.REMOTE_ACTIVE)
 
+    def on_connection_closing(self, event):
+        self._remove_stale_consumers(event.connection)
+
+    def on_disconnected(self, event):
+        self._remove_stale_consumers(event.connection)
+
     def on_sendable(self, event):
-        self._queue(event.link.source.address).dispatch(event.link)
+        link: Link = event.link
+        self._queue(link.source.address).dispatch(link)
 
     def on_message(self, event):
-        address = event.link.target.address
+        link: Link = event.link
+        delivery: Delivery = event.delivery
+
+        message = event.message
+        if link.target.type == Terminus.COORDINATOR:
+            # Deal with special transaction messages
+            self._coordinator_message(message, delivery)
+            return
+
+        address = link.target.address
         if address is None:
-            address = event.message.address
-        self._queue(address).publish(event.message)
+            address = message.address
+
+        if address is None:
+            self._verbose_print("{message.id=}: Message without address")
+            delivery.local.condition = Condition('amqp:link:invalid-address')
+            delivery.update(Disposition.REJECTED)
+            delivery.settle()
+            return
+
+        # Is this a transactioned message?
+        disposition = delivery.remote
+        if disposition and disposition.type == Disposition.TRANSACTIONAL_STATE:
+            tid = disposition.id
+            if tid in self.txns:
+                self._verbose_print(f"{tid=}: Message: {message.id=}")
+                self.txns[tid].append(QueueMessage(delivery, message, address))
+                return
+            else:
+                self._verbose_print(f"{tid=}: Message: unknown txn-id")
+                delivery.local.condition = 
Condition('amqp:transaction:unknown-id')
+                delivery.update(Disposition.REJECTED)
+                delivery.settle()
+                return
+
+        self.publish(message, address)
+        delivery.update(Disposition.ACCEPTED)
+        delivery.settle()
+
+    def publish(self, message, address):
+        queue = self._queue(address)
+        queue.publish(message)
+        self._verbose_print(f"{message.id=} Queued: {queue.name=}")
+
+    def deliver(self, consumer, message):
+        delivery = consumer.send(message)
+        address = consumer.source.address
+        self.unsettled_deliveries[delivery] = UnsettledDelivery(address, 
message)
+        self._verbose_print(f"{delivery.tag=}: Sent: {message.id=} to 
{address=}")
+
+    def on_delivery_updated(self, event):
+        """Handle all delivery updates for the link."""
+        delivery = event.delivery
+        disposition = delivery.remote
+        # Is this a transactioned delivery update?
+        if disposition.type == Disposition.TRANSACTIONAL_STATE:
+            tid = disposition.id
+            outcome = disposition.outcome_type
+            if tid in self.txns:
+                self._verbose_print(f"{tid=}: Delivery update: 
outcome={outcome}")
+                self.txns[tid].append(DeliveryUpdate(delivery, outcome))
+                return
+            else:
+                self._verbose_print(f"{tid=}: Delivery update: unknown txn-id")
+                delivery.local.condition = 
Condition('amqp:transaction:unknown-id')
+                delivery.update(Disposition.REJECTED)
+        else:
+            self.delivery_update(delivery, disposition.type)
+        # The delivery is settled in every case except a valid transaction
+        # where the outcome is not yet known until the transaction is 
discharged.
+        delivery.settle()
 
 
 def main():
     parser = optparse.OptionParser(usage="usage: %prog [options]")
+    parser.add_option("-v", "--verbose", action="store_true", default=False,
+                      help="enable verbose output (default %default)")
     parser.add_option("-a", "--address", default="localhost:5672",
                       help="address router listens on (default %default)")
+    parser.add_option("-l", "--redelivery-limit", default=5,
+                      help="maximum redelivery attempts (default %default)")
     opts, args = parser.parse_args()
 
     try:
-        Container(Broker(opts.address)).run()
+        Container(Broker(opts.address, opts.verbose, 
opts.redelivery_limit)).run()
     except KeyboardInterrupt:
         pass
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to