Piotr Kliczewski has uploaded a new change for review.

Change subject: stomp: subscription cleanup when connection lost
......................................................................

stomp: subscription cleanup when connection lost

When we loose network connection we need to cleanup all the subscription
that are assosiated with it.


Change-Id: Iff05e4ff9d336dbfddf80002bad5bcd0dbac697f
Signed-off-by: pkliczewski <[email protected]>
---
M lib/yajsonrpc/stomp.py
M lib/yajsonrpc/stompReactor.py
2 files changed, 20 insertions(+), 1 deletion(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/69/39969/1

diff --git a/lib/yajsonrpc/stomp.py b/lib/yajsonrpc/stomp.py
index 14c15ac..e74a8ca 100644
--- a/lib/yajsonrpc/stomp.py
+++ b/lib/yajsonrpc/stomp.py
@@ -384,6 +384,10 @@
     def _milis(self):
         return int(round(monotonic_time() * 1000))
 
+    def handle_close(self, dispatcher):
+        dispatcher.close()
+        self.connection.clean()
+
 
 class AsyncClient(object):
     log = logging.getLogger("yajsonrpc.stomp.AsyncClient")
diff --git a/lib/yajsonrpc/stompReactor.py b/lib/yajsonrpc/stompReactor.py
index 5e9941c..921d191 100644
--- a/lib/yajsonrpc/stompReactor.py
+++ b/lib/yajsonrpc/stompReactor.py
@@ -79,6 +79,12 @@
     def queue_frame(self, frame):
         self._outbox.append(frame)
 
+    def remove_subscriptions(self):
+        for sub_id in self._sub_ids:
+            self._remove_subscription(sub_id)
+
+        self._sub_ids.clear()
+
     def _cmd_connect(self, dispatcher, frame):
         self.log.info("Processing CONNECT request")
         version = frame.headers.get("accept-version", None)
@@ -141,12 +147,15 @@
                              dispatcher.connection)
             return
 
+        self._remove_subscription(sub_id)
+        del self._sub_ids[sub_id]
+
+    def _remove_subscription(self, sub_id):
         subscription = self._sub_ids[sub_id]
         if not subscription:
             # ignore
             return
 
-        del self._sub_ids[sub_id]
         subs = self._sub_dests[subscription.destination]
         if len(subs) == 1:
             del self._sub_dests[subscription.destination]
@@ -233,6 +242,7 @@
 
     def close(self):
         self._dispatcher.close()
+        self.clean()
 
     def get_local_address(self):
         return self._socket.getsockname()[0]
@@ -244,6 +254,10 @@
     def handleMessage(self, data):
         if self._messageHandler is not None:
             self._messageHandler((self._server, self, data))
+
+    def clean(self):
+        if hasattr(self._aclient, 'remove_subscriptions'):
+            self._aclient.remove_subscriptions()
 
 
 class StompServer(object):
@@ -491,6 +505,7 @@
 
     def close(self):
         self._sub.unsubscribe()
+        self._client.close()
 
 
 def StompRpcClient(


-- 
To view, visit https://gerrit.ovirt.org/39969
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iff05e4ff9d336dbfddf80002bad5bcd0dbac697f
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Piotr Kliczewski <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to