Piotr Kliczewski has uploaded a new change for review. Change subject: stomp: subscription cleanup when connection lost ......................................................................
stomp: subscription cleanup when connection lost When we loose network connection we need to cleanup all the subscription that are assosiated with it. Change-Id: Iff05e4ff9d336dbfddf80002bad5bcd0dbac697f Signed-off-by: pkliczewski <[email protected]> --- M lib/yajsonrpc/stomp.py M lib/yajsonrpc/stompReactor.py 2 files changed, 20 insertions(+), 1 deletion(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/69/39969/1 diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py index 14c15ac..e74a8ca 100644 --- a/lib/yajsonrpc/stomp.py +++ b/lib/yajsonrpc/stomp.py @@ -384,6 +384,10 @@ def _milis(self): return int(round(monotonic_time() * 1000)) + def handle_close(self, dispatcher): + dispatcher.close() + self.connection.clean() + class AsyncClient(object): log = logging.getLogger("yajsonrpc.stomp.AsyncClient") diff --git a/lib/yajsonrpc/stompReactor.py b/lib/yajsonrpc/stompReactor.py index 5e9941c..921d191 100644 --- a/lib/yajsonrpc/stompReactor.py +++ b/lib/yajsonrpc/stompReactor.py @@ -79,6 +79,12 @@ def queue_frame(self, frame): self._outbox.append(frame) + def remove_subscriptions(self): + for sub_id in self._sub_ids: + self._remove_subscription(sub_id) + + self._sub_ids.clear() + def _cmd_connect(self, dispatcher, frame): self.log.info("Processing CONNECT request") version = frame.headers.get("accept-version", None) @@ -141,12 +147,15 @@ dispatcher.connection) return + self._remove_subscription(sub_id) + del self._sub_ids[sub_id] + + def _remove_subscription(self, sub_id): subscription = self._sub_ids[sub_id] if not subscription: # ignore return - del self._sub_ids[sub_id] subs = self._sub_dests[subscription.destination] if len(subs) == 1: del self._sub_dests[subscription.destination] @@ -233,6 +242,7 @@ def close(self): self._dispatcher.close() + self.clean() def get_local_address(self): return self._socket.getsockname()[0] @@ -244,6 +254,10 @@ def handleMessage(self, data): if self._messageHandler is not None: self._messageHandler((self._server, self, data)) + + def clean(self): + if hasattr(self._aclient, 'remove_subscriptions'): + self._aclient.remove_subscriptions() class StompServer(object): @@ -491,6 +505,7 @@ def close(self): self._sub.unsubscribe() + self._client.close() def StompRpcClient( -- To view, visit https://gerrit.ovirt.org/39969 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Iff05e4ff9d336dbfddf80002bad5bcd0dbac697f Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Piotr Kliczewski <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
