Piotr Kliczewski has uploaded a new change for review. Change subject: jsonrpc: executor based thread factory ......................................................................
jsonrpc: executor based thread factory Creating new thread for every request is not efficient so we introduce usage of the executor for request processing. Change-Id: I56b307633a8bf7e4aad8f87cc97a4129c9ed0970 Signed-off-by: pkliczewski <piotr.kliczew...@gmail.com> --- M lib/vdsm/config.py.in M lib/yajsonrpc/__init__.py M tests/jsonRpcHelper.py M tests/jsonRpcTests.py M vdsm/clientIF.py M vdsm/rpc/BindingJsonRpc.py 6 files changed, 76 insertions(+), 12 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/98/48198/1 diff --git a/lib/vdsm/config.py.in b/lib/vdsm/config.py.in index 49a210b..e25e0bc 100644 --- a/lib/vdsm/config.py.in +++ b/lib/vdsm/config.py.in @@ -249,6 +249,17 @@ ]), + # Section: [rpc] + ('rpc', [ + + ('worker_threads', '8', + 'Number of worker threads to serve jsonrpc server.'), + + ('tasks_per_worker', '10', + 'Max number of tasks which can be queued per workers.'), + + ]), + # Section: [mom] ('mom', [ diff --git a/lib/yajsonrpc/__init__.py b/lib/yajsonrpc/__init__.py index 53b4e2f..9ccce62 100644 --- a/lib/yajsonrpc/__init__.py +++ b/lib/yajsonrpc/__init__.py @@ -582,7 +582,19 @@ if self._threadFactory is None: self._serveRequest(ctx, request) else: - self._threadFactory(partial(self._serveRequest, ctx, request)) + try: + self._threadFactory(partial(self._serveRequest, ctx, request)) + except Exception as e: + self.log.exception("could not allocate request thread") + ctx.requestDone( + JsonRpcResponse( + None, + JsonRpcInternalError( + str(e) + ), + request.id + ) + ) def stop(self): self.log.info("Stopping JsonRPC Server") diff --git a/tests/jsonRpcHelper.py b/tests/jsonRpcHelper.py index 7c4ed40..30905a0 100644 --- a/tests/jsonRpcHelper.py +++ b/tests/jsonRpcHelper.py @@ -34,6 +34,8 @@ from protocoldetector import MultiProtocolAcceptor from yajsonrpc import JsonRpcClientPool from rpc.BindingJsonRpc import BindingJsonRpc +from vdsm import schedule +from vdsm import utils CERT_DIR = os.path.abspath(os.path.dirname(__file__)) @@ -80,7 +82,11 @@ xmlDetector = XmlDetector(xml_binding) acceptor.add_detector(xmlDetector) - json_binding = BindingJsonRpc(jsonBridge) + scheduler = schedule.Scheduler(name="test.Scheduler", + clock=utils.monotonic_time) + scheduler.start() + + json_binding = BindingJsonRpc(jsonBridge, scheduler) json_binding.start() stompDetector = StompDetector(json_binding) acceptor.add_detector(stompDetector) @@ -96,6 +102,7 @@ acceptor.stop() json_binding.stop() xml_binding.stop() + scheduler.stop(wait=False) @contextmanager diff --git a/tests/jsonRpcTests.py b/tests/jsonRpcTests.py index 02c132f..85b3b30 100644 --- a/tests/jsonRpcTests.py +++ b/tests/jsonRpcTests.py @@ -21,6 +21,7 @@ from clientIF import clientIF from contextlib import contextmanager from monkeypatch import MonkeyPatch +from vdsm import executor from testrunner import VdsmTestCase as TestCaseBase, \ expandPermutations, \ @@ -66,6 +67,10 @@ def getInstance(): return FakeClientIf() + + +def dispatch(callable, timeout=None): + raise executor.TooManyTasks @expandPermutations @@ -176,3 +181,20 @@ res = self._callTimeout(client, "ping", [], CALL_ID, timeout=CALL_TIMEOUT) self.assertEquals(res, True) + + @MonkeyPatch(clientIF, 'getInstance', getInstance) + @MonkeyPatch(executor.Executor, 'dispatch', dispatch) + @permutations(PERMUTATIONS) + def testFullExecutor(self, ssl, type): + bridge = _DummyBridge() + with constructClient(self.log, bridge, ssl, type) as clientFactory: + with self._client(clientFactory) as client: + if type == "xml": + # TODO start using executor for xmlrpc + pass + else: + with self.assertRaises(JsonRpcError) as cm: + self._callTimeout(client, "no_method", [], CALL_ID) + + self.assertEquals(cm.exception.code, + JsonRpcInternalError().code) diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py index f300ee0..f65dd9d 100644 --- a/vdsm/clientIF.py +++ b/vdsm/clientIF.py @@ -65,7 +65,7 @@ _instance = None _instanceLock = threading.Lock() - def __init__(self, irs, log): + def __init__(self, irs, log, scheduler): """ Initialize the (single) clientIF instance @@ -87,6 +87,7 @@ self._generationID = str(uuid.uuid4()) self.mom = None self.bindings = {} + self._scheduler = scheduler if _glusterEnabled: self.gluster = gapi.GlusterApi(self, log) else: @@ -148,14 +149,14 @@ vmObj.cont() @classmethod - def getInstance(cls, irs=None, log=None): + def getInstance(cls, irs=None, log=None, scheduler=None): with cls._instanceLock: if cls._instance is None: if log is None: raise Exception("Logging facility is required to create " "the single clientIF instance") else: - cls._instance = clientIF(irs, log) + cls._instance = clientIF(irs, log, scheduler) return cls._instance def _createAcceptor(self, host, port): @@ -200,7 +201,7 @@ 'Please make sure it is installed.') else: bridge = Bridge.DynamicBridge() - json_binding = BindingJsonRpc(bridge) + json_binding = BindingJsonRpc(bridge, self._scheduler) self.bindings['jsonrpc'] = json_binding stomp_detector = StompDetector(json_binding) self._acceptor.add_detector(stomp_detector) diff --git a/vdsm/rpc/BindingJsonRpc.py b/vdsm/rpc/BindingJsonRpc.py index a5b5b4a..ddaad5e 100644 --- a/vdsm/rpc/BindingJsonRpc.py +++ b/vdsm/rpc/BindingJsonRpc.py @@ -19,18 +19,26 @@ from yajsonrpc import JsonRpcServer from yajsonrpc.stompReactor import StompReactor +from vdsm import executor +from vdsm.config import config -def _simpleThreadFactory(func): - t = threading.Thread(target=func) - t.setDaemon(False) - t.start() + +# TODO test what should be the default values +_THREADS = config.getint('rpc', 'worker_threads') +_TASK_PER_WORKER = config.getint('rpc', 'tasks_per_worker') +_TASKS = _THREADS * _TASK_PER_WORKER class BindingJsonRpc(object): log = logging.getLogger('BindingJsonRpc') - def __init__(self, bridge): - self._server = JsonRpcServer(bridge, _simpleThreadFactory) + def __init__(self, bridge, scheduler): + self._executor = executor.Executor(name="jsonrpc.Executor", + workers_count=_THREADS, + max_tasks=_TASKS, + scheduler=scheduler) + + self._server = JsonRpcServer(bridge, self._executor.dispatch) self._reactors = [] def add_socket(self, reactor, client_socket): @@ -46,6 +54,8 @@ return reactor def start(self): + self._executor.start() + t = threading.Thread(target=self._server.serve_requests, name='JsonRpcServer') t.setDaemon(True) @@ -62,3 +72,4 @@ self._server.stop() for reactor in self._reactors: reactor.stop() + self._executor.stop() -- To view, visit https://gerrit.ovirt.org/48198 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I56b307633a8bf7e4aad8f87cc97a4129c9ed0970 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: ovirt-3.5 Gerrit-Owner: Piotr Kliczewski <piotr.kliczew...@gmail.com> Gerrit-Reviewer: automat...@ovirt.org _______________________________________________ vdsm-patches mailing list vdsm-patches@lists.fedorahosted.org https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches