Saggi Mizrahi has uploaded a new change for review.

Change subject: Fix race in ProtonReactor and add better delivery semantics
......................................................................

Fix race in ProtonReactor and add better delivery semantics

Change-Id: Ie53d6f4b8a119f8a9e366b717c22ba38bcc99e80
Signed-off-by: Saggi Mizrahi <[email protected]>
---
M vdsm_api/jsonrpc/protonReactor.py
1 file changed, 60 insertions(+), 9 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/56/10256/1

diff --git a/vdsm_api/jsonrpc/protonReactor.py 
b/vdsm_api/jsonrpc/protonReactor.py
index f5363ad..75c5e4e 100644
--- a/vdsm_api/jsonrpc/protonReactor.py
+++ b/vdsm_api/jsonrpc/protonReactor.py
@@ -16,6 +16,7 @@
 import logging
 import uuid
 from Queue import Queue, Empty
+import time
 
 import proton
 
@@ -27,10 +28,11 @@
 class ProtonContext(object):
     log = logging.getLogger("jsonrpc.ProtonContext")
 
-    def __init__(self, reactor, messageQueue, msg):
+    def __init__(self, reactor, messageQueue, cxtr, msg):
         self._reactor = reactor
         self._msg = msg
         self._mq = messageQueue
+        self._cxtr = cxtr
 
     @property
     def data(self):
@@ -42,13 +44,14 @@
         msg.body = data
         self._mq.put_nowait(msg)
         self.log.debug("Message Queued")
+        self._reactor._activate(self._cxtr, proton.PN_CONNECTOR_WRITABLE)
         self._reactor._wakeup()
 
 
 class ProtonReactor(object):
     log = logging.getLogger("jsonrpc.ProtonReactor")
 
-    def __init__(self, address, messageHandler):
+    def __init__(self, address, messageHandler, deliveryTimeout=5):
         self._messageHandler = messageHandler
         host, port = address
         self.host = host
@@ -59,6 +62,11 @@
         self._driver = proton.pn_driver()
 
         self._sessionContexts = []
+        self._deliveryTimeout = deliveryTimeout
+        self._activationQeue = Queue()
+
+    def _activate(self, cxtr, cond):
+        self._activationQeue.put_nowait((cxtr, cond))
 
     def _convertTimeout(self, timeout):
         """
@@ -182,18 +190,46 @@
             proton.pn_link_open(link)
             link = proton.pn_link_next(link, proton.PN_LOCAL_UNINIT)
 
-    def _processDeliveries(self, conn):
+    def _processDeliveries(self, conn, cxtr):
         delivery = proton.pn_work_head(conn)
         while delivery:
             self.log.debug("Process delivery %s" %
                            proton.pn_delivery_tag(delivery))
 
             if proton.pn_delivery_readable(delivery):
-                self._processIncoming(delivery)
+                self._processIncoming(delivery, cxtr)
             elif proton.pn_delivery_writable(delivery):
                 self._processOutgoing(delivery)
 
             delivery = proton.pn_work_next(delivery)
+
+    def _cleanDeliveries(self, conn):
+        link = proton.pn_link_head(conn, (proton.PN_LOCAL_ACTIVE))
+        while link:
+            d = proton.pn_unsettled_head(link)
+            while d:
+                _next = proton.pn_unsettled_next(d)
+                disp = proton.pn_delivery_remote_state(d)
+                age = time.time() - proton.pn_delivery_get_context(d)
+                self.log.debug("Checking delivery")
+                if disp and disp != proton.PN_ACCEPTED:
+                    self.log.warn("Message was not accepted by remote end")
+
+                if disp and proton.pn_delivery_settled(d):
+                    self.log.debug("Message settled by remote end")
+                    proton.pn_delivery_settle(d)
+
+                elif age > self._deliveryTimeout:
+                    self.log.warn("Delivary not settled by remote host")
+                    proton.pn_delivery_settle(d)
+
+                elif proton.pn_link_state(link) & proton.PN_REMOTE_CLOSED:
+                    self.log.warn("Link closed before settling message")
+                    proton.pn_delivery_settle(d)
+
+                d = _next
+
+            link = proton.pn_link_next(link, (proton.PN_LOCAL_ACTIVE))
 
     def _cleanLinks(self, conn):
         link = proton.pn_link_head(conn, (proton.PN_LOCAL_ACTIVE |
@@ -249,6 +285,10 @@
             else:
                 self.log.debug("Creating delivery")
                 proton.pn_link_set_context(sender, msg.encode())
+                if proton.pn_link_credit(sender) == 0:
+                    self.log.debug("Not enough credit, waiting")
+                    continue
+
                 proton.pn_delivery(sender,
                                    "response-delivery-%s" % str(uuid.uuid4()))
 
@@ -260,7 +300,8 @@
         self._openPendingSessions(conn)
         self._openLinks(conn)
         self._queueOutgoingDeliveries(conn)
-        self._processDeliveries(conn)
+        self._processDeliveries(conn, cxtr)
+        self._cleanDeliveries(conn)
         self._cleanLinks(conn)
         self._cleanSessions(conn)
 
@@ -269,7 +310,7 @@
             self.log.debug("Connection Closed")
             proton.pn_connection_close(conn)
 
-    def _processIncoming(self, delivery):
+    def _processIncoming(self, delivery, cxtr):
         link = proton.pn_delivery_link(delivery)
         ssn = proton.pn_link_session(link)
         msg = []
@@ -285,7 +326,8 @@
         msgObj.decode(msg)
         ctx = proton.pn_session_get_context(ssn)
         mq = ctx['mqueue']
-        self._messageHandler.handleMessage(ProtonContext(self, mq, msgObj))
+        self._messageHandler.handleMessage(ProtonContext(self, mq, cxtr,
+                                                         msgObj))
 
         proton.pn_delivery_settle(delivery)
         proton.pn_link_advance(link)
@@ -308,8 +350,7 @@
             else:
                 self.log.debug("Delivery finished")
                 proton.pn_link_set_context(link, "")
-                # We don't care if the delivery is successful or not
-                proton.pn_delivery_settle(delivery)
+                proton.pn_delivery_set_context(delivery, time.time())
                 proton.pn_link_advance(link)
 
     def start_listening(self):
@@ -319,10 +360,20 @@
             raise RuntimeError("Could not listen on %s:%s" % (self.host,
                                                               self.port))
 
+    def _emptyActivationQueue(self):
+        while True:
+            try:
+                args = self._activationQeue.get_nowait()
+            except Empty:
+                return
+            else:
+                proton.pn_connector_activate(*args)
+
     def process_requests(self):
         self._isRunning = True
         while self._isRunning:
             self._waitDriverEvent()
+            self._emptyActivationQueue()
             self._acceptConnectionRequests()
             self._processConnectors()
 


--
To view, visit http://gerrit.ovirt.org/10256
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ie53d6f4b8a119f8a9e366b717c22ba38bcc99e80
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to