PROTON-1534: BlockingConnection proper cleanup after LinkDetached exception
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/47c9ae01 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/47c9ae01 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/47c9ae01 Branch: refs/heads/go1 Commit: 47c9ae01dcbdbfffc0a4e78b4648171896d0f2a7 Parents: b9d64d6 Author: Clifford Jansen <cliffjan...@apache.org> Authored: Thu Oct 12 14:04:15 2017 -0700 Committer: Clifford Jansen <cliffjan...@apache.org> Committed: Thu Oct 12 14:05:50 2017 -0700 ---------------------------------------------------------------------- proton-c/bindings/python/proton/utils.py | 35 ++++++++++++++++++--------- 1 file changed, 23 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/47c9ae01/proton-c/bindings/python/proton/utils.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py index 13f29a7..c9a3654 100644 --- a/proton-c/bindings/python/proton/utils.py +++ b/proton-c/bindings/python/proton/utils.py @@ -22,6 +22,7 @@ from proton import ConnectionException, Delivery, Endpoint, Handler, Link, LinkE from proton import ProtonException, Timeout, Url from proton.reactor import Container from proton.handlers import MessagingHandler, IncomingMessageHandler +from cproton import pn_reactor_collector, pn_collector_release class BlockingLink(object): @@ -43,7 +44,8 @@ class BlockingLink(object): def _checkClosed(self): if self.link.state & Endpoint.REMOTE_CLOSED: self.link.close() - raise LinkDetached(self.link) + if not self.connection.closing: + raise LinkDetached(self.link) def close(self): self.link.close() @@ -99,10 +101,12 @@ class Fetcher(MessagingHandler): def on_link_error(self, event): if event.link.state & Endpoint.LOCAL_ACTIVE: event.link.close() - raise LinkDetached(event.link) + if not self.connection.closing: + raise LinkDetached(event.link) def on_connection_error(self, event): - raise ConnectionClosed(event.connection) + if not self.connection.closing: + raise ConnectionClosed(event.connection) @property def has_message(self): @@ -214,6 +218,7 @@ class BlockingConnection(Handler): self.container.start() self.url = Url(url).defaults() self.conn = None + self.closing = False failed = True try: self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) @@ -239,19 +244,23 @@ class BlockingConnection(Handler): self, self.container.create_receiver(self.conn, address, name=name, dynamic=dynamic, handler=handler or fetcher, options=options), fetcher, credit=prefetch) def close(self): - if not self.conn: + # TODO: provide stronger interrupt protection on cleanup. See PEP 419 + if self.closing: return - self.conn.close() + self.closing = True + self.container.errors = [] try: - self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), - msg="Closing connection") + if self.conn: + self.conn.close() + self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_ACTIVE), + msg="Closing connection") finally: self.conn.free() - # For cleanup, reactor needs to process PN_CONNECTION_FINAL - # and all events with embedded contexts must be drained. - self.run() # will not block any more + # Nothing left to block on. Allow reactor to clean up. + self.run() self.conn = None self.container.global_handler = None # break circular ref: container to cadapter.on_error + pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive self.container = None def _is_closed(self): @@ -293,12 +302,14 @@ class BlockingConnection(Handler): def on_link_remote_close(self, event): if event.link.state & Endpoint.LOCAL_ACTIVE: event.link.close() - raise LinkDetached(event.link) + if not self.closing: + raise LinkDetached(event.link) def on_connection_remote_close(self, event): if event.connection.state & Endpoint.LOCAL_ACTIVE: event.connection.close() - raise ConnectionClosed(event.connection) + if not self.closing: + raise ConnectionClosed(event.connection) def on_transport_tail_closed(self, event): self.on_transport_closed(event) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org