Saggi Mizrahi has uploaded a new change for review. Change subject: [WIP] JsonRpcServer Threading support ......................................................................
[WIP] JsonRpcServer Threading support Change-Id: I1510320f192f448ade41018d513dd10308b207d3 Signed-off-by: Saggi Mizrahi <[email protected]> --- M vdsm/clientIF.py M vdsm_api/BindingJsonRpc.py M vdsm_api/jsonrpc/__init__.py 3 files changed, 83 insertions(+), 33 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/06/10206/1 diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py index 231e0b4..04653bb 100644 --- a/vdsm/clientIF.py +++ b/vdsm/clientIF.py @@ -156,7 +156,8 @@ schema = os.path.join(constants.P_VDSM, 'vdsmapi-schema.json') ip = self._getServerIP(config.get('addresses', 'management_ip')) port = config.getint('addresses', 'json_port') - conf = [('tcp', {"ip": ip, "port": port})] + conf = [('tcp', {"ip": ip, "port": port}), + ('amqp', {"host": ip})] self.bindings['json'] = BindingJsonRpc(DynamicBridge(schema), conf) def _prepareBindings(self): diff --git a/vdsm_api/BindingJsonRpc.py b/vdsm_api/BindingJsonRpc.py index f71491f..a41dcef 100644 --- a/vdsm_api/BindingJsonRpc.py +++ b/vdsm_api/BindingJsonRpc.py @@ -21,6 +21,11 @@ from jsonrpc import JsonRpcServer from jsonrpc.tcpReactor import TCPReactor +ProtonReactor = None +try: + from jsonrpc.protonReactor import ProtonReactor +except ImportError: + pass class BindingJsonRpc(object): @@ -33,6 +38,11 @@ for backendType, cfg in backendConfig: if backendType == "tcp": reactors.append(self._createTcpReactor(cfg)) + elif backendType == "amqp": + if ProtonReactor is None: + continue + + reactors.append(self._createProtonReactor(cfg)) self._reactors = reactors @@ -41,6 +51,11 @@ port = cfg.get("port", 4044) return TCPReactor((address, port), self.server) + def _createProtonReactor(self, cfg): + address = cfg.get("host", "0.0.0.0") + port = cfg.get("port", 5672) + return TCPReactor((address, port), self.server) + def start(self): for reactor in self._reactors: reactorName = reactor.__class__.__name__ diff --git a/vdsm_api/jsonrpc/__init__.py b/vdsm_api/jsonrpc/__init__.py index d0ed044..f35a014 100644 --- a/vdsm_api/jsonrpc/__init__.py +++ b/vdsm_api/jsonrpc/__init__.py @@ -14,6 +14,7 @@ # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import json import logging +from Queue import Queue __all__ = ["tcpReactor"] @@ -51,16 +52,19 @@ class JsonRpcInternalError(JsonRpcError): - def __init__(self): - JsonRpcError.__init__(self, -32603, - "Internal JSON-RPC error.") + def __init__(self, msg=None): + if msg is None: + msg = "Internal JSON-RPC error." + JsonRpcError.__init__(self, -32603, msg) -class JsonRpcRequest(object): - def __init__(self, method, params, reqId): - self.method = method +class _JsonRpcRequest(object): + def __init__(self, ctx, queue, methodName, params, reqId): + self.method = methodName self.params = params self.id = reqId + self._ctx = ctx + self._queue = queue def invokeFunction(self, func): if isinstance(self.params, list): @@ -71,12 +75,28 @@ def isNotification(self): return self.id is None + def sendReply(self, result, error): + # TBD: Should calling this for a notification raise an error or be + # ignored + self._queue.put_nowait(_JsonRpcResponse(self._ctx, result, error)) + + +class _JsonRpcResponse(object): + def __init__(self, ctx, result, error): + self._ctx = ctx + self._result = result + self._error = error + + def generateRespnose(self): + pass + class JsonRpcServer(object): log = logging.getLogger("JsonRpcServer") def __init__(self, bridge): self._bridge = bridge + self._workQueue = Queue() def _parseMessage(self, msg): try: @@ -84,7 +104,7 @@ except: raise JsonRpcParseError() - def _parseRequest(self, obj): + def _parseRequest(self, obj, ctx, queue): if obj.get("jsonrpc") != "2.0": raise JsonRpcInvalidRequestError() @@ -100,7 +120,7 @@ if not isinstance(params, (list, dict)): raise JsonRpcInvalidRequestError() - return JsonRpcRequest(method, params, reqId) + return _JsonRpcRequest(ctx, queue, method, params, reqId) def _jsonError2Response(self, err, req): respId = None @@ -116,38 +136,52 @@ "result": result, "id": req.id}, 'utf-8') + def _serveRequest(self, req): + mangledMethod = req.method.replace(".", "_") + self.log.debug("Looking for method '%s' in bridge", + mangledMethod) + try: + method = getattr(self._bridge, mangledMethod) + except AttributeError: + raise JsonRpcMethodNotFoundError() + else: + res = req.invokeFunction(method) + resp = self._generateResponse(req, res) + + def serve_requests(self): + obj = self._queue.get() + while True: + if obj is None: + break + + if isinstance(obj, JsonRpcRequest): + self._serveRequest(obj) + else: + self._processResponse(obj) + def handleMessage(self, msgCtx): #TODO: support batch requests req = None - resp = None + error = None try: obj = self._parseMessage(msgCtx.data) - req = self._parseRequest(obj) + req = self._parseRequest(obj, msgCtx, self._workQueue) - mangledMethod = req.method.replace(".", "_") - self.log.debug("Looking for method '%s' in bridge", - mangledMethod) - try: - method = getattr(self._bridge, mangledMethod) - except AttributeError: - raise JsonRpcMethodNotFoundError() - else: - res = req.invokeFunction(method) - resp = self._generateResponse(req, res) - - except JsonRpcError as e: - resp = self._jsonError2Response(e, req) - self.log.error("Error processing request", exc_info=True) - except: - resp = self._jsonError2Response(JsonRpcInternalError(), req) - self.log.error("Unexpected error while processing request", - exc_info=True) - - if resp is None: + self._queue.put_nowait(req) return - # Notification don't respond even on errors + except JsonRpcError as e: + self.log.error("Error processing request", exc_info=True) + error = e + except: + self.log.error("Unexpected error while processing request", + exc_info=True) + + error = JsonRpcInternalError() + + # Notification, don't respond even on errors if req is not None and req.isNotification(): return - msgCtx.sendReply(resp) + self._workQueue.put_nowait(_JsonRpcResponse(msgCtx, None, e)) + -- To view, visit http://gerrit.ovirt.org/10206 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I1510320f192f448ade41018d513dd10308b207d3 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Saggi Mizrahi <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
