Saggi Mizrahi has uploaded a new change for review. Change subject: Run JsonRpcServer requests in their own thread instead of the transport thread ......................................................................
Run JsonRpcServer requests in their own thread instead of the transport thread This makes sure there is a single sync point for all json-rpc requests and that handling a request never blocks the transport for processing messages. Change-Id: Ifccf3fd0e3b1bec7db475b2674cff09bb920738e Signed-off-by: Saggi Mizrahi <[email protected]> --- M tests/jsonRpcTests.py M vdsm_api/BindingJsonRpc.py M vdsm_api/jsonrpc/__init__.py 3 files changed, 112 insertions(+), 34 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/52/10252/1 diff --git a/tests/jsonRpcTests.py b/tests/jsonRpcTests.py index b495541..da8ff5d 100644 --- a/tests/jsonRpcTests.py +++ b/tests/jsonRpcTests.py @@ -116,10 +116,17 @@ t.setDaemon(True) t.start() + t = threading.Thread(target=server.serve_requests) + t.setDaemon(True) + t.start() + def jsonClientFactory(): return JsonRpcClient(clientFactory()) - yield server, jsonClientFactory + try: + yield server, jsonClientFactory + finally: + server.stop() class _EchoMessageHandler(object): diff --git a/vdsm_api/BindingJsonRpc.py b/vdsm_api/BindingJsonRpc.py index 49f45df..645428e 100644 --- a/vdsm_api/BindingJsonRpc.py +++ b/vdsm_api/BindingJsonRpc.py @@ -71,6 +71,12 @@ t.setDaemon(True) t.start() + t = threading.Thread(target=self.server.serve_requests, + name='JsonRpc (Rquest Processing)') + t.setDaemon(True) + t.start() + def prepareForShutdown(self): + self.server.stop() for reactor in self._reactors: reactor.stop() diff --git a/vdsm_api/jsonrpc/__init__.py b/vdsm_api/jsonrpc/__init__.py index f12c808..8e58ffe 100644 --- a/vdsm_api/jsonrpc/__init__.py +++ b/vdsm_api/jsonrpc/__init__.py @@ -14,6 +14,7 @@ # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import json import logging +from Queue import Queue __all__ = ["tcpReactor"] @@ -57,11 +58,13 @@ JsonRpcError.__init__(self, -32603, msg) -class JsonRpcRequest(object): - def __init__(self, method, params, reqId): - self.method = method +class _JsonRpcRequest(object): + def __init__(self, ctx, queue, methodName, params, reqId): + self.method = methodName self.params = params self.id = reqId + self._ctx = ctx + self._queue = queue def invokeFunction(self, func): if isinstance(self.params, list): @@ -72,12 +75,27 @@ def isNotification(self): return self.id is None + def sendReply(self, result, error): + # TBD: Should calling this for a notification raise an error or be + # ignored + self._queue.put_nowait(_JsonRpcResponse(self._ctx, self.id, result, + error)) + + +class _JsonRpcResponse(object): + def __init__(self, ctx, reqId, result, error): + self.ctx = ctx + self.result = result + self.error = error + self.id = reqId + class JsonRpcServer(object): log = logging.getLogger("JsonRpcServer") def __init__(self, bridge): self._bridge = bridge + self._workQueue = Queue() def _parseMessage(self, msg): try: @@ -85,7 +103,7 @@ except: raise JsonRpcParseError() - def _parseRequest(self, obj): + def _parseRequest(self, obj, ctx, queue): if obj.get("jsonrpc") != "2.0": raise JsonRpcInvalidRequestError() @@ -101,7 +119,7 @@ if not isinstance(params, (list, dict)): raise JsonRpcInvalidRequestError() - return JsonRpcRequest(method, params, reqId) + return _JsonRpcRequest(ctx, queue, method, params, reqId) def _jsonError2Response(self, err, req): respId = None @@ -112,43 +130,90 @@ "error": {"code": err.code, "message": err.message}, "id": respId}) - def _generateResponse(self, req, result): - return json.dumps({"jsonrpc": "2.0", - "result": result, - "id": req.id}, 'utf-8') + def _generateResponse(self, resp): + res = {"jsonrpc": "2.0", + "id": resp.id} + if resp.error is not None: + res['error'] = {'code': resp.error.code, + 'message': resp.error.message} + else: + res['result'] = resp.result + + return json.dumps(res, 'utf-8') + + def _serveRequest(self, req): + mangledMethod = req.method.replace(".", "_") + self.log.debug("Looking for method '%s' in bridge", + mangledMethod) + try: + method = getattr(self._bridge, mangledMethod) + except AttributeError: + req.sendReply(None, JsonRpcMethodNotFoundError()) + else: + try: + res = req.invokeFunction(method) + except JsonRpcError as e: + req.sendReply(None, e) + except Exception as e: + req.sendReply(None, JsonRpcInternalError(str(e))) + else: + return req.sendReply(res, None) + + def _processResponse(self, resp): + try: + msg = self._generateResponse(resp) + except Exception as e: + # Probably result failed to be serialized as json + errResp = _JsonRpcResponse(resp.ctx, + resp.id, + None, + JsonRpcInternalError(str(e))) + + msg = self._generateResponse(errResp) + + resp.ctx.sendReply(msg) + + def serve_requests(self): + while True: + obj = self._workQueue.get() + if obj is None: + break + + if isinstance(obj, _JsonRpcRequest): + self._serveRequest(obj) + else: + self._processResponse(obj) def handleMessage(self, msgCtx): #TODO: support batch requests req = None - resp = None + error = None try: obj = self._parseMessage(msgCtx.data) - req = self._parseRequest(obj) + req = self._parseRequest(obj, msgCtx, self._workQueue) - mangledMethod = req.method.replace(".", "_") - self.log.debug("Looking for method '%s' in bridge", - mangledMethod) - try: - method = getattr(self._bridge, mangledMethod) - except AttributeError: - raise JsonRpcMethodNotFoundError() - else: - res = req.invokeFunction(method) - resp = self._generateResponse(req, res) - - except JsonRpcError as e: - resp = self._jsonError2Response(e, req) - self.log.error("Error processing request", exc_info=True) - except: - resp = self._jsonError2Response(JsonRpcInternalError(), req) - self.log.error("Unexpected error while processing request", - exc_info=True) - - if resp is None: + self._workQueue.put_nowait(req) return - # Notification don't respond even on errors + except JsonRpcError as e: + self.log.error("Error processing request", exc_info=True) + error = e + except: + self.log.error("Unexpected error while processing request", + exc_info=True) + + error = JsonRpcInternalError() + + # Notification, don't respond even on errors if req is not None and req.isNotification(): return - msgCtx.sendReply(resp) + if req is None: + resp = _JsonRpcResponse(msgCtx, None, None, error) + else: + resp = _JsonRpcResponse(msgCtx, req.id, None, error) + + self._workQueue.put_nowait(resp) + + def stop(self): + self._workQueue.put_nowait(None) -- To view, visit http://gerrit.ovirt.org/10252 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ifccf3fd0e3b1bec7db475b2674cff09bb920738e Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Saggi Mizrahi <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
