Saggi Mizrahi has uploaded a new change for review.

Change subject: [WIP] JsonRpcServer Threading support
......................................................................

[WIP] JsonRpcServer Threading support

Change-Id: I1510320f192f448ade41018d513dd10308b207d3
Signed-off-by: Saggi Mizrahi <[email protected]>
---
M vdsm/clientIF.py
M vdsm_api/BindingJsonRpc.py
M vdsm_api/jsonrpc/__init__.py
3 files changed, 83 insertions(+), 33 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/06/10206/1

diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index 231e0b4..04653bb 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -156,7 +156,8 @@
         schema = os.path.join(constants.P_VDSM, 'vdsmapi-schema.json')
         ip = self._getServerIP(config.get('addresses', 'management_ip'))
         port = config.getint('addresses', 'json_port')
-        conf = [('tcp', {"ip": ip, "port": port})]
+        conf = [('tcp', {"ip": ip, "port": port}),
+                ('amqp', {"host": ip})]
         self.bindings['json'] = BindingJsonRpc(DynamicBridge(schema), conf)
 
     def _prepareBindings(self):
diff --git a/vdsm_api/BindingJsonRpc.py b/vdsm_api/BindingJsonRpc.py
index f71491f..a41dcef 100644
--- a/vdsm_api/BindingJsonRpc.py
+++ b/vdsm_api/BindingJsonRpc.py
@@ -21,6 +21,11 @@
 
 from jsonrpc import JsonRpcServer
 from jsonrpc.tcpReactor import TCPReactor
+ProtonReactor = None
+try:
+    from jsonrpc.protonReactor import ProtonReactor
+except ImportError:
+    pass
 
 
 class BindingJsonRpc(object):
@@ -33,6 +38,11 @@
         for backendType, cfg in backendConfig:
             if backendType == "tcp":
                 reactors.append(self._createTcpReactor(cfg))
+            elif backendType == "amqp":
+                if ProtonReactor is None:
+                    continue
+
+                reactors.append(self._createProtonReactor(cfg))
 
         self._reactors = reactors
 
@@ -41,6 +51,11 @@
         port = cfg.get("port", 4044)
         return TCPReactor((address, port), self.server)
 
+    def _createProtonReactor(self, cfg):
+        address = cfg.get("host", "0.0.0.0")
+        port = cfg.get("port", 5672)
+        return TCPReactor((address, port), self.server)
+
     def start(self):
         for reactor in self._reactors:
             reactorName = reactor.__class__.__name__
diff --git a/vdsm_api/jsonrpc/__init__.py b/vdsm_api/jsonrpc/__init__.py
index d0ed044..f35a014 100644
--- a/vdsm_api/jsonrpc/__init__.py
+++ b/vdsm_api/jsonrpc/__init__.py
@@ -14,6 +14,7 @@
 # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
 import json
 import logging
+from Queue import Queue
 
 __all__ = ["tcpReactor"]
 
@@ -51,16 +52,19 @@
 
 
 class JsonRpcInternalError(JsonRpcError):
-    def __init__(self):
-        JsonRpcError.__init__(self, -32603,
-                              "Internal JSON-RPC error.")
+    def __init__(self, msg=None):
+        if msg is None:
+            msg = "Internal JSON-RPC error."
+        JsonRpcError.__init__(self, -32603, msg)
 
 
-class JsonRpcRequest(object):
-    def __init__(self, method, params, reqId):
-        self.method = method
+class _JsonRpcRequest(object):
+    def __init__(self, ctx, queue, methodName, params, reqId):
+        self.method = methodName
         self.params = params
         self.id = reqId
+        self._ctx = ctx
+        self._queue = queue
 
     def invokeFunction(self, func):
         if isinstance(self.params, list):
@@ -71,12 +75,28 @@
     def isNotification(self):
         return self.id is None
 
+    def sendReply(self, result, error):
+        # TBD: Should calling this for a notification raise an error or be
+        #      ignored
+        self._queue.put_nowait(_JsonRpcResponse(self._ctx, result, error))
+
+
+class _JsonRpcResponse(object):
+    def __init__(self, ctx, result, error):
+        self._ctx = ctx
+        self._result = result
+        self._error = error
+
+    def generateRespnose(self):
+        pass
+
 
 class JsonRpcServer(object):
     log = logging.getLogger("JsonRpcServer")
 
     def __init__(self, bridge):
         self._bridge = bridge
+        self._workQueue = Queue()
 
     def _parseMessage(self, msg):
         try:
@@ -84,7 +104,7 @@
         except:
             raise JsonRpcParseError()
 
-    def _parseRequest(self, obj):
+    def _parseRequest(self, obj, ctx, queue):
         if obj.get("jsonrpc") != "2.0":
             raise JsonRpcInvalidRequestError()
 
@@ -100,7 +120,7 @@
         if not isinstance(params, (list, dict)):
             raise JsonRpcInvalidRequestError()
 
-        return JsonRpcRequest(method, params, reqId)
+        return _JsonRpcRequest(ctx, queue, method, params, reqId)
 
     def _jsonError2Response(self, err, req):
         respId = None
@@ -116,38 +136,52 @@
                           "result": result,
                           "id": req.id}, 'utf-8')
 
+    def _serveRequest(self, req):
+        mangledMethod = req.method.replace(".", "_")
+        self.log.debug("Looking for method '%s' in bridge",
+                       mangledMethod)
+        try:
+            method = getattr(self._bridge, mangledMethod)
+        except AttributeError:
+            raise JsonRpcMethodNotFoundError()
+        else:
+            res = req.invokeFunction(method)
+            resp = self._generateResponse(req, res)
+
+    def serve_requests(self):
+        obj = self._queue.get()
+        while True:
+            if obj is None:
+                break
+
+            if isinstance(obj, JsonRpcRequest):
+                self._serveRequest(obj)
+            else:
+                self._processResponse(obj)
+
     def handleMessage(self, msgCtx):
         #TODO: support batch requests
         req = None
-        resp = None
+        error = None
         try:
             obj = self._parseMessage(msgCtx.data)
-            req = self._parseRequest(obj)
+            req = self._parseRequest(obj, msgCtx, self._workQueue)
 
-            mangledMethod = req.method.replace(".", "_")
-            self.log.debug("Looking for method '%s' in bridge",
-                           mangledMethod)
-            try:
-                method = getattr(self._bridge, mangledMethod)
-            except AttributeError:
-                raise JsonRpcMethodNotFoundError()
-            else:
-                res = req.invokeFunction(method)
-                resp = self._generateResponse(req, res)
-
-        except JsonRpcError as e:
-                resp = self._jsonError2Response(e, req)
-                self.log.error("Error processing request", exc_info=True)
-        except:
-                resp = self._jsonError2Response(JsonRpcInternalError(), req)
-                self.log.error("Unexpected error while processing request",
-                               exc_info=True)
-
-        if resp is None:
+            self._queue.put_nowait(req)
             return
 
-        # Notification don't respond even on errors
+        except JsonRpcError as e:
+            self.log.error("Error processing request", exc_info=True)
+            error = e
+        except:
+            self.log.error("Unexpected error while processing request",
+                               exc_info=True)
+
+            error = JsonRpcInternalError()
+
+        # Notification, don't respond even on errors
         if req is not None and req.isNotification():
             return
 
-        msgCtx.sendReply(resp)
+        self._workQueue.put_nowait(_JsonRpcResponse(msgCtx, None, e))
+


--
To view, visit http://gerrit.ovirt.org/10206
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I1510320f192f448ade41018d513dd10308b207d3
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Saggi Mizrahi <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to