Piotr Kliczewski has uploaded a new change for review.

Change subject: jsonrpc: executor based thread factory
......................................................................

jsonrpc: executor based thread factory

Creating new thread for every request is not efficient so we introduce
usage of the executor for request processing.

Change-Id: I56b307633a8bf7e4aad8f87cc97a4129c9ed0970
Signed-off-by: pkliczewski <piotr.kliczew...@gmail.com>
---
M lib/vdsm/config.py.in
M lib/yajsonrpc/__init__.py
M tests/jsonRpcHelper.py
M tests/jsonRpcTests.py
M vdsm/clientIF.py
M vdsm/rpc/BindingJsonRpc.py
6 files changed, 76 insertions(+), 12 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/98/48198/1

diff --git a/lib/vdsm/config.py.in b/lib/vdsm/config.py.in
index 49a210b..e25e0bc 100644
--- a/lib/vdsm/config.py.in
+++ b/lib/vdsm/config.py.in
@@ -249,6 +249,17 @@
 
     ]),
 
+    # Section: [rpc]
+    ('rpc', [
+
+        ('worker_threads', '8',
+            'Number of worker threads to serve jsonrpc server.'),
+
+        ('tasks_per_worker', '10',
+            'Max number of tasks which can be queued per workers.'),
+
+    ]),
+
     # Section: [mom]
     ('mom', [
 
diff --git a/lib/yajsonrpc/__init__.py b/lib/yajsonrpc/__init__.py
index 53b4e2f..9ccce62 100644
--- a/lib/yajsonrpc/__init__.py
+++ b/lib/yajsonrpc/__init__.py
@@ -582,7 +582,19 @@
         if self._threadFactory is None:
             self._serveRequest(ctx, request)
         else:
-            self._threadFactory(partial(self._serveRequest, ctx, request))
+            try:
+                self._threadFactory(partial(self._serveRequest, ctx, request))
+            except Exception as e:
+                self.log.exception("could not allocate request thread")
+                ctx.requestDone(
+                    JsonRpcResponse(
+                        None,
+                        JsonRpcInternalError(
+                            str(e)
+                        ),
+                        request.id
+                    )
+                )
 
     def stop(self):
         self.log.info("Stopping JsonRPC Server")
diff --git a/tests/jsonRpcHelper.py b/tests/jsonRpcHelper.py
index 7c4ed40..30905a0 100644
--- a/tests/jsonRpcHelper.py
+++ b/tests/jsonRpcHelper.py
@@ -34,6 +34,8 @@
 from protocoldetector import MultiProtocolAcceptor
 from yajsonrpc import JsonRpcClientPool
 from rpc.BindingJsonRpc import BindingJsonRpc
+from vdsm import schedule
+from vdsm import utils
 
 
 CERT_DIR = os.path.abspath(os.path.dirname(__file__))
@@ -80,7 +82,11 @@
     xmlDetector = XmlDetector(xml_binding)
     acceptor.add_detector(xmlDetector)
 
-    json_binding = BindingJsonRpc(jsonBridge)
+    scheduler = schedule.Scheduler(name="test.Scheduler",
+                                   clock=utils.monotonic_time)
+    scheduler.start()
+
+    json_binding = BindingJsonRpc(jsonBridge, scheduler)
     json_binding.start()
     stompDetector = StompDetector(json_binding)
     acceptor.add_detector(stompDetector)
@@ -96,6 +102,7 @@
         acceptor.stop()
         json_binding.stop()
         xml_binding.stop()
+        scheduler.stop(wait=False)
 
 
 @contextmanager
diff --git a/tests/jsonRpcTests.py b/tests/jsonRpcTests.py
index 02c132f..85b3b30 100644
--- a/tests/jsonRpcTests.py
+++ b/tests/jsonRpcTests.py
@@ -21,6 +21,7 @@
 from clientIF import clientIF
 from contextlib import contextmanager
 from monkeypatch import MonkeyPatch
+from vdsm import executor
 
 from testrunner import VdsmTestCase as TestCaseBase, \
     expandPermutations, \
@@ -66,6 +67,10 @@
 
 def getInstance():
     return FakeClientIf()
+
+
+def dispatch(callable, timeout=None):
+    raise executor.TooManyTasks
 
 
 @expandPermutations
@@ -176,3 +181,20 @@
                     res = self._callTimeout(client, "ping", [],
                                             CALL_ID, timeout=CALL_TIMEOUT)
                     self.assertEquals(res, True)
+
+    @MonkeyPatch(clientIF, 'getInstance', getInstance)
+    @MonkeyPatch(executor.Executor, 'dispatch', dispatch)
+    @permutations(PERMUTATIONS)
+    def testFullExecutor(self, ssl, type):
+        bridge = _DummyBridge()
+        with constructClient(self.log, bridge, ssl, type) as clientFactory:
+            with self._client(clientFactory) as client:
+                if type == "xml":
+                    # TODO start using executor for xmlrpc
+                    pass
+                else:
+                    with self.assertRaises(JsonRpcError) as cm:
+                        self._callTimeout(client, "no_method", [], CALL_ID)
+
+                    self.assertEquals(cm.exception.code,
+                                      JsonRpcInternalError().code)
diff --git a/vdsm/clientIF.py b/vdsm/clientIF.py
index f300ee0..f65dd9d 100644
--- a/vdsm/clientIF.py
+++ b/vdsm/clientIF.py
@@ -65,7 +65,7 @@
     _instance = None
     _instanceLock = threading.Lock()
 
-    def __init__(self, irs, log):
+    def __init__(self, irs, log, scheduler):
         """
         Initialize the (single) clientIF instance
 
@@ -87,6 +87,7 @@
         self._generationID = str(uuid.uuid4())
         self.mom = None
         self.bindings = {}
+        self._scheduler = scheduler
         if _glusterEnabled:
             self.gluster = gapi.GlusterApi(self, log)
         else:
@@ -148,14 +149,14 @@
                         vmObj.cont()
 
     @classmethod
-    def getInstance(cls, irs=None, log=None):
+    def getInstance(cls, irs=None, log=None, scheduler=None):
         with cls._instanceLock:
             if cls._instance is None:
                 if log is None:
                     raise Exception("Logging facility is required to create "
                                     "the single clientIF instance")
                 else:
-                    cls._instance = clientIF(irs, log)
+                    cls._instance = clientIF(irs, log, scheduler)
         return cls._instance
 
     def _createAcceptor(self, host, port):
@@ -200,7 +201,7 @@
                               'Please make sure it is installed.')
             else:
                 bridge = Bridge.DynamicBridge()
-                json_binding = BindingJsonRpc(bridge)
+                json_binding = BindingJsonRpc(bridge, self._scheduler)
                 self.bindings['jsonrpc'] = json_binding
                 stomp_detector = StompDetector(json_binding)
                 self._acceptor.add_detector(stomp_detector)
diff --git a/vdsm/rpc/BindingJsonRpc.py b/vdsm/rpc/BindingJsonRpc.py
index a5b5b4a..ddaad5e 100644
--- a/vdsm/rpc/BindingJsonRpc.py
+++ b/vdsm/rpc/BindingJsonRpc.py
@@ -19,18 +19,26 @@
 from yajsonrpc import JsonRpcServer
 from yajsonrpc.stompReactor import StompReactor
 
+from vdsm import executor
+from vdsm.config import config
 
-def _simpleThreadFactory(func):
-    t = threading.Thread(target=func)
-    t.setDaemon(False)
-    t.start()
+
+# TODO test what should be the default values
+_THREADS = config.getint('rpc', 'worker_threads')
+_TASK_PER_WORKER = config.getint('rpc', 'tasks_per_worker')
+_TASKS = _THREADS * _TASK_PER_WORKER
 
 
 class BindingJsonRpc(object):
     log = logging.getLogger('BindingJsonRpc')
 
-    def __init__(self, bridge):
-        self._server = JsonRpcServer(bridge, _simpleThreadFactory)
+    def __init__(self, bridge, scheduler):
+        self._executor = executor.Executor(name="jsonrpc.Executor",
+                                           workers_count=_THREADS,
+                                           max_tasks=_TASKS,
+                                           scheduler=scheduler)
+
+        self._server = JsonRpcServer(bridge, self._executor.dispatch)
         self._reactors = []
 
     def add_socket(self, reactor, client_socket):
@@ -46,6 +54,8 @@
         return reactor
 
     def start(self):
+        self._executor.start()
+
         t = threading.Thread(target=self._server.serve_requests,
                              name='JsonRpcServer')
         t.setDaemon(True)
@@ -62,3 +72,4 @@
         self._server.stop()
         for reactor in self._reactors:
             reactor.stop()
+        self._executor.stop()


-- 
To view, visit https://gerrit.ovirt.org/48198
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I56b307633a8bf7e4aad8f87cc97a4129c9ed0970
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: ovirt-3.5
Gerrit-Owner: Piotr Kliczewski <piotr.kliczew...@gmail.com>
Gerrit-Reviewer: automat...@ovirt.org
_______________________________________________
vdsm-patches mailing list
vdsm-patches@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to