This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push: new d1ce340cb PROTON-2890: [Python examples] Broker: Improve transactional link handling d1ce340cb is described below commit d1ce340cb257354a716472a95d173a0cb182f149 Author: Andrew Stitcher <astitc...@apache.org> AuthorDate: Thu May 1 21:02:54 2025 -0400 PROTON-2890: [Python examples] Broker: Improve transactional link handling * Use new link iterator API * Abort any active transactions when coordinator link closed --- python/examples/broker.py | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/python/examples/broker.py b/python/examples/broker.py index b350654a3..9d2cb38f7 100755 --- a/python/examples/broker.py +++ b/python/examples/broker.py @@ -156,6 +156,7 @@ class Broker(MessagingHandler): # requested = link.remote_target.capabilities.get_object() link.target.type = Terminus.COORDINATOR link.target.copy(link.remote_target) + link._txns = set() elif link.remote_target.address: link.target.address = link.remote_target.address @@ -228,20 +229,23 @@ class Broker(MessagingHandler): def _coordinator_message(self, msg, delivery): body = msg.body if isinstance(body, Described): + link = delivery.link d = body.descriptor - if d == "amqp:declare:list": + if d == "amqp:declare:list" or d == 0x31: # Allocate transaction id tid = self._declare_txn() self._verbose_print(f"{tid=}: Declare") delivery.local = TransactionalDisposition(tid) - elif d == "amqp:discharge:list": + link._txns.add(tid) + elif d == "amqp:discharge:list" or d == 0x32: # Always accept commit/abort! value = body.value tid = bytes(value[0]) failed = bool(value[1]) - if tid in self.txns: + if tid in link._txns: self._discharge_txn(tid, failed) delivery.update(Disposition.ACCEPTED) + link._txns.remove(tid) else: self._verbose_print(f"{tid=}: Discharge unknown txn-id: {failed=}") delivery.local.condition = Condition('amqp:transaction:unknown-id') @@ -249,21 +253,37 @@ class Broker(MessagingHandler): delivery.settle() def on_link_closing(self, event): - if event.link.is_sender: - self._unsubscribe(event.link) + link = event.link + if link.is_sender: + self._unsubscribe(link) + elif link.target.type == Terminus.COORDINATOR: + # Abort any remaining active transactions + for tid in link._txns: + self._discharge_txn(tid, failed=True) + link._txns.clear() def _remove_stale_consumers(self, connection): - link = connection.link_head(Endpoint.REMOTE_ACTIVE) - while link: + for link in connection.links(Endpoint.REMOTE_ACTIVE): if link.is_sender: self._unsubscribe(link) - link = link.next(Endpoint.REMOTE_ACTIVE) + + def _abort_active_transactions(self, connection): + for link in connection.links(Endpoint.LOCAL_ACTIVE): + if link.target.type == Terminus.COORDINATOR: + # Abort any remaining active transactions + for tid in link._txns: + self._discharge_txn(tid, failed=True) + link._txns.clear() def on_connection_closing(self, event): - self._remove_stale_consumers(event.connection) + connection = event.connection + self._remove_stale_consumers(connection) + self._abort_active_transactions(connection) def on_disconnected(self, event): - self._remove_stale_consumers(event.connection) + connection = event.connection + self._remove_stale_consumers(connection) + self._abort_active_transactions(connection) def on_sendable(self, event): link: Link = event.link --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org