This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new d1ce340cb PROTON-2890: [Python examples] Broker: Improve transactional 
link handling
d1ce340cb is described below

commit d1ce340cb257354a716472a95d173a0cb182f149
Author: Andrew Stitcher <astitc...@apache.org>
AuthorDate: Thu May 1 21:02:54 2025 -0400

    PROTON-2890: [Python examples] Broker: Improve transactional link handling
    
    * Use new link iterator API
    * Abort any active transactions when coordinator link closed
---
 python/examples/broker.py | 40 ++++++++++++++++++++++++++++++----------
 1 file changed, 30 insertions(+), 10 deletions(-)

diff --git a/python/examples/broker.py b/python/examples/broker.py
index b350654a3..9d2cb38f7 100755
--- a/python/examples/broker.py
+++ b/python/examples/broker.py
@@ -156,6 +156,7 @@ class Broker(MessagingHandler):
             # requested = link.remote_target.capabilities.get_object()
             link.target.type = Terminus.COORDINATOR
             link.target.copy(link.remote_target)
+            link._txns = set()
         elif link.remote_target.address:
             link.target.address = link.remote_target.address
 
@@ -228,20 +229,23 @@ class Broker(MessagingHandler):
     def _coordinator_message(self, msg, delivery):
         body = msg.body
         if isinstance(body, Described):
+            link = delivery.link
             d = body.descriptor
-            if d == "amqp:declare:list":
+            if d == "amqp:declare:list" or d == 0x31:
                 # Allocate transaction id
                 tid = self._declare_txn()
                 self._verbose_print(f"{tid=}: Declare")
                 delivery.local = TransactionalDisposition(tid)
-            elif d == "amqp:discharge:list":
+                link._txns.add(tid)
+            elif d == "amqp:discharge:list" or d == 0x32:
                 # Always accept commit/abort!
                 value = body.value
                 tid = bytes(value[0])
                 failed = bool(value[1])
-                if tid in self.txns:
+                if tid in link._txns:
                     self._discharge_txn(tid, failed)
                     delivery.update(Disposition.ACCEPTED)
+                    link._txns.remove(tid)
                 else:
                     self._verbose_print(f"{tid=}: Discharge unknown txn-id: 
{failed=}")
                     delivery.local.condition = 
Condition('amqp:transaction:unknown-id')
@@ -249,21 +253,37 @@ class Broker(MessagingHandler):
         delivery.settle()
 
     def on_link_closing(self, event):
-        if event.link.is_sender:
-            self._unsubscribe(event.link)
+        link = event.link
+        if link.is_sender:
+            self._unsubscribe(link)
+        elif link.target.type == Terminus.COORDINATOR:
+            # Abort any remaining active transactions
+            for tid in link._txns:
+                self._discharge_txn(tid, failed=True)
+            link._txns.clear()
 
     def _remove_stale_consumers(self, connection):
-        link = connection.link_head(Endpoint.REMOTE_ACTIVE)
-        while link:
+        for link in connection.links(Endpoint.REMOTE_ACTIVE):
             if link.is_sender:
                 self._unsubscribe(link)
-            link = link.next(Endpoint.REMOTE_ACTIVE)
+
+    def _abort_active_transactions(self, connection):
+        for link in connection.links(Endpoint.LOCAL_ACTIVE):
+            if link.target.type == Terminus.COORDINATOR:
+                # Abort any remaining active transactions
+                for tid in link._txns:
+                    self._discharge_txn(tid, failed=True)
+                link._txns.clear()
 
     def on_connection_closing(self, event):
-        self._remove_stale_consumers(event.connection)
+        connection = event.connection
+        self._remove_stale_consumers(connection)
+        self._abort_active_transactions(connection)
 
     def on_disconnected(self, event):
-        self._remove_stale_consumers(event.connection)
+        connection = event.connection
+        self._remove_stale_consumers(connection)
+        self._abort_active_transactions(connection)
 
     def on_sendable(self, event):
         link: Link = event.link


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to