This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new d1ce340cb PROTON-2890: [Python examples] Broker: Improve transactional
link handling
d1ce340cb is described below
commit d1ce340cb257354a716472a95d173a0cb182f149
Author: Andrew Stitcher <[email protected]>
AuthorDate: Thu May 1 21:02:54 2025 -0400
PROTON-2890: [Python examples] Broker: Improve transactional link handling
* Use new link iterator API
* Abort any active transactions when coordinator link closed
---
python/examples/broker.py | 40 ++++++++++++++++++++++++++++++----------
1 file changed, 30 insertions(+), 10 deletions(-)
diff --git a/python/examples/broker.py b/python/examples/broker.py
index b350654a3..9d2cb38f7 100755
--- a/python/examples/broker.py
+++ b/python/examples/broker.py
@@ -156,6 +156,7 @@ class Broker(MessagingHandler):
# requested = link.remote_target.capabilities.get_object()
link.target.type = Terminus.COORDINATOR
link.target.copy(link.remote_target)
+ link._txns = set()
elif link.remote_target.address:
link.target.address = link.remote_target.address
@@ -228,20 +229,23 @@ class Broker(MessagingHandler):
def _coordinator_message(self, msg, delivery):
body = msg.body
if isinstance(body, Described):
+ link = delivery.link
d = body.descriptor
- if d == "amqp:declare:list":
+ if d == "amqp:declare:list" or d == 0x31:
# Allocate transaction id
tid = self._declare_txn()
self._verbose_print(f"{tid=}: Declare")
delivery.local = TransactionalDisposition(tid)
- elif d == "amqp:discharge:list":
+ link._txns.add(tid)
+ elif d == "amqp:discharge:list" or d == 0x32:
# Always accept commit/abort!
value = body.value
tid = bytes(value[0])
failed = bool(value[1])
- if tid in self.txns:
+ if tid in link._txns:
self._discharge_txn(tid, failed)
delivery.update(Disposition.ACCEPTED)
+ link._txns.remove(tid)
else:
self._verbose_print(f"{tid=}: Discharge unknown txn-id:
{failed=}")
delivery.local.condition =
Condition('amqp:transaction:unknown-id')
@@ -249,21 +253,37 @@ class Broker(MessagingHandler):
delivery.settle()
def on_link_closing(self, event):
- if event.link.is_sender:
- self._unsubscribe(event.link)
+ link = event.link
+ if link.is_sender:
+ self._unsubscribe(link)
+ elif link.target.type == Terminus.COORDINATOR:
+ # Abort any remaining active transactions
+ for tid in link._txns:
+ self._discharge_txn(tid, failed=True)
+ link._txns.clear()
def _remove_stale_consumers(self, connection):
- link = connection.link_head(Endpoint.REMOTE_ACTIVE)
- while link:
+ for link in connection.links(Endpoint.REMOTE_ACTIVE):
if link.is_sender:
self._unsubscribe(link)
- link = link.next(Endpoint.REMOTE_ACTIVE)
+
+ def _abort_active_transactions(self, connection):
+ for link in connection.links(Endpoint.LOCAL_ACTIVE):
+ if link.target.type == Terminus.COORDINATOR:
+ # Abort any remaining active transactions
+ for tid in link._txns:
+ self._discharge_txn(tid, failed=True)
+ link._txns.clear()
def on_connection_closing(self, event):
- self._remove_stale_consumers(event.connection)
+ connection = event.connection
+ self._remove_stale_consumers(connection)
+ self._abort_active_transactions(connection)
def on_disconnected(self, event):
- self._remove_stale_consumers(event.connection)
+ connection = event.connection
+ self._remove_stale_consumers(connection)
+ self._abort_active_transactions(connection)
def on_sendable(self, event):
link: Link = event.link
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]