Hello Dan Kenigsberg, Francesco Romani,

I'd like you to do a code review.  Please visit

    https://gerrit.ovirt.org/48196

to review the following change.

Change subject: executor: introduce the executor library
......................................................................

executor: introduce the executor library

Executor is a thread pool augmented with the
capability of discard blocked worker threads,
by replacing them with fresh ones.

This is needed to accomodate the needs of the sampling code.
The sampling needs to deal with possibly blocking libvirt
calls which needs to enter into the QEMU monitor,
thus can get stuck due to QEMU being in D state.

Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Signed-off-by: Francesco Romani <from...@redhat.com>
Reviewed-on: http://gerrit.ovirt.org/29191
Reviewed-by: Nir Soffer <nsof...@redhat.com>
Reviewed-by: Dan Kenigsberg <dan...@redhat.com>
Signed-off-by: pkliczewski <piotr.kliczew...@gmail.com>
Bug-Url: https://bugzilla.redhat.com/1279950
---
M debian/vdsm-python.install
M lib/vdsm/Makefile.am
A lib/vdsm/executor.py
M tests/Makefile.am
A tests/executorTests.py
M vdsm.spec.in
6 files changed, 404 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/96/48196/2

diff --git a/debian/vdsm-python.install b/debian/vdsm-python.install
index c10381e..956f618 100644
--- a/debian/vdsm-python.install
+++ b/debian/vdsm-python.install
@@ -8,6 +8,7 @@
 ./usr/lib/python2.7/dist-packages/vdsm/constants.py
 ./usr/lib/python2.7/dist-packages/vdsm/define.py
 ./usr/lib/python2.7/dist-packages/vdsm/exception.py
+./usr/lib/python2.7/dist-packages/vdsm/executor.py
 ./usr/lib/python2.7/dist-packages/vdsm/ipwrapper.py
 ./usr/lib/python2.7/dist-packages/vdsm/libvirtconnection.py
 ./usr/lib/python2.7/dist-packages/vdsm/netconfpersistence.py
diff --git a/lib/vdsm/Makefile.am b/lib/vdsm/Makefile.am
index 6c8fb0f..ba3a2c8 100644
--- a/lib/vdsm/Makefile.am
+++ b/lib/vdsm/Makefile.am
@@ -27,6 +27,7 @@
        compat.py \
        define.py \
        exception.py \
+       executor.py \
        ipwrapper.py \
        libvirtconnection.py \
        netconfpersistence.py \
diff --git a/lib/vdsm/executor.py b/lib/vdsm/executor.py
new file mode 100644
index 0000000..fc7880e
--- /dev/null
+++ b/lib/vdsm/executor.py
@@ -0,0 +1,255 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""Threaded based executor.
+Blocked tasks may be discarded, and the worker pool is automatically
+replenished."""
+
+import collections
+import logging
+import threading
+
+from . import utils
+
+
+class NotRunning(Exception):
+    """Executor not yet started or shutting down."""
+
+
+class AlreadyStarted(Exception):
+    """Executor started multiple times."""
+
+
+class TooManyTasks(Exception):
+    """Too many tasks for this Executor."""
+
+
+class Executor(object):
+    """
+    Executes potentially blocking task into background
+    threads. Can replace stuck threads with fresh ones.
+    """
+
+    _log = logging.getLogger('Executor')
+
+    def __init__(self, name, workers_count, max_tasks, scheduler):
+        self._name = name
+        self._workers_count = workers_count
+        self._tasks = _TaskQueue(max_tasks)
+        self._scheduler = scheduler
+        self._workers = set()
+        self._lock = threading.Lock()
+        self._running = False
+
+    @property
+    def name(self):
+        return self._name
+
+    def start(self):
+        self._log.debug('Starting executor')
+        with self._lock:
+            if self._running:
+                raise AlreadyStarted()
+            self._running = True
+            for _ in xrange(self._workers_count):
+                self._add_worker()
+
+    def stop(self, wait=True):
+        self._log.debug('Stopping executor')
+        with self._lock:
+            self._running = False
+            self._tasks.clear()
+            for _ in xrange(self._workers_count):
+                self._tasks.put((_STOP, None))
+            workers = tuple(self._workers) if wait else ()
+        for worker in workers:
+            worker.join()
+
+    def dispatch(self, callable, timeout=None):
+        """
+        dispatches a new task to the executor.
+
+        The task may be any callable.
+        The task will be executed as soon as possible
+        in one of the active workers of the executor.
+
+        The timeout is measured from the time the callable
+        is called.
+        """
+        if not self._running:
+            raise NotRunning()
+        self._tasks.put((callable, timeout))
+
+    # Serving workers
+
+    def _worker_discarded(self, worker):
+        """
+        Called from scheduler thread when worker was discarded. The worker
+        thread is blocked on a task, and will exit when the task finish.
+        """
+        with self._lock:
+            if self._running:
+                self._add_worker()
+
+    def _worker_stopped(self, worker):
+        """
+        Called from worker thread before it exit.
+        """
+        with self._lock:
+            self._workers.remove(worker)
+
+    def _next_task(self):
+        """
+        Called from worker thread to get the next task from the taks queue.
+        Raises NotRunning exception if executor was stopped.
+        """
+        task, timeout = self._tasks.get()
+        if task is _STOP:
+            raise NotRunning()
+        return task, timeout
+
+    # Private
+
+    def _add_worker(self):
+        worker = _Worker(self, self._scheduler)
+        self._workers.add(worker)
+
+
+_STOP = object()
+
+
+class _WorkerDiscarded(Exception):
+    """ Raised if worker was discarded during execution of a task """
+
+
+class _Worker(object):
+
+    _log = logging.getLogger('Executor')
+    _id = 0
+
+    def __init__(self, executor, scheduler):
+        self._executor = executor
+        self._scheduler = scheduler
+        self._discarded = False
+        _Worker._id += 1
+        name = "%s-worker-%d" % (self._executor.name, _Worker._id)
+        self._thread = threading.Thread(target=self._run, name=name)
+        self._thread.daemon = True
+        self._log.debug('Starting worker %s' % name)
+        self._thread.start()
+
+    @property
+    def name(self):
+        return self._thread.name
+
+    def join(self):
+        self._log.debug('Waiting for worker %s', self.name)
+        self._thread.join()
+
+    @utils.traceback(on=_log.name)
+    def _run(self):
+        self._log.debug('Worker started')
+        try:
+            while True:
+                self._execute_task()
+        except NotRunning:
+            self._log.debug('Worker stopped')
+        except _WorkerDiscarded:
+            self._log.debug('Worker was discarded')
+        finally:
+            self._executor._worker_stopped(self)
+
+    def _execute_task(self):
+        callable, timeout = self._executor._next_task()
+        discard = self._discard_after(timeout)
+        try:
+            callable()
+        except Exception:
+            self._log.exception("Unhandled exception in %s", callable)
+        finally:
+            # We want to discard workers that were too slow to disarm
+            # the timer. It does not matter if the thread was still
+            # blocked on callable when we discard it or it just finished.
+            # However, we expect that most of times only blocked threads
+            # will be discarded.
+            if discard is not None:
+                discard.cancel()
+            if self._discarded:
+                raise _WorkerDiscarded()
+
+    def _discard_after(self, timeout):
+        if timeout is not None:
+            return self._scheduler.schedule(timeout, self._discard)
+        return None
+
+    def _discard(self):
+        if self._discarded:
+            raise AssertionError("Attempt to discard worker twice")
+        self._discarded = True
+        self._log.debug("Worker %s discarded", self.name)
+        self._executor._worker_discarded(self)
+
+
+class _TaskQueue(object):
+    """
+    Replacement for Queue.Queue, with two important changes:
+
+    * Queue.Queue blocks when full. We want to raise TooManyTasks instead.
+    * Queue.Queue lacks the clear() operation, which is needed to implement
+      the 'poison pill' pattern (described for example in
+      http://pymotw.com/2/multiprocessing/communication.html )
+    """
+
+    def __init__(self, max_tasks):
+        self._max_tasks = max_tasks
+        self._tasks = collections.deque()
+        # deque is thread safe - we can append and pop from both ends without
+        # additional locking. We need this condition only for waiting. See:
+        # https://docs.python.org/2.6/library/queue.html#Queue.Full
+        self._cond = threading.Condition(threading.Lock())
+
+    def put(self, task):
+        """
+        Put a new task in the queue.
+        Do not block when full, raises TooManyTasks instead.
+        """
+        # There is a race here but we don't really mind. Worst case scenario,
+        # we will have a bit more task then the configured maximum, but we
+        # just want to avoid to have indefinite amount of tasks.
+        if len(self._tasks) == self._max_tasks:
+            raise TooManyTasks()
+        self._tasks.append(task)
+        with self._cond:
+            self._cond.notify()
+
+    def get(self):
+        """
+        Get a new task. Blocks if empty.
+        """
+        while True:
+            try:
+                return self._tasks.popleft()
+            except IndexError:
+                with self._cond:
+                    if not self._tasks:
+                        self._cond.wait()
+
+    def clear(self):
+        self._tasks.clear()
diff --git a/tests/Makefile.am b/tests/Makefile.am
index b617c6f..bd039f3 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -31,6 +31,7 @@
        clientifTests.py \
        cmdutilsTests.py \
        configNetworkTests.py \
+       executorTests.py \
        fileVolumeTests.py \
        fileUtilTests.py \
        fuserTests.py \
diff --git a/tests/executorTests.py b/tests/executorTests.py
new file mode 100644
index 0000000..d56fcb1
--- /dev/null
+++ b/tests/executorTests.py
@@ -0,0 +1,145 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import threading
+import time
+
+from vdsm import executor
+from vdsm import schedule
+
+from testValidation import slowtest
+from testrunner import VdsmTestCase as TestCaseBase
+
+
+class ExecutorTests(TestCaseBase):
+
+    def setUp(self):
+        self.scheduler = schedule.Scheduler()
+        self.scheduler.start()
+        self.executor = executor.Executor('test',
+                                          workers_count=10, max_tasks=20,
+                                          scheduler=self.scheduler)
+        self.executor.start()
+        time.sleep(0.1)  # Give time to start all threads
+
+    def tearDown(self):
+        self.executor.stop()
+        self.scheduler.stop()
+
+    def test_dispatch_not_running(self):
+        self.executor.stop()
+        self.assertRaises(executor.NotRunning,
+                          self.executor.dispatch,
+                          Task())
+
+    def test_start_twice(self):
+        self.assertRaises(executor.AlreadyStarted,
+                          self.executor.start)
+
+    def test_dispatch(self):
+        task = Task()
+        self.executor.dispatch(task)
+        task.executed.wait(0.1)
+        self.assertTrue(task.executed.is_set())
+
+    def test_dispatch_after_fault(self):
+        faulty_task = Task(error=RuntimeError("fake error"))
+        self.executor.dispatch(faulty_task)
+        faulty_task.executed.wait(0.1)
+        task = Task()
+        self.executor.dispatch(task)
+        task.executed.wait(0.1)
+        self.assertTrue(task.executed.is_set())
+
+    @slowtest
+    def test_dispatch_with_timeout(self):
+        task = Task(wait=0.2)
+        self.executor.dispatch(task, 0.1)
+        task.executed.wait(0.3)
+        self.assertTrue(task.executed.is_set())  # task must have executed!
+
+    def test_too_many_tasks(self):
+        tasks = [Task(wait=0.1) for n in xrange(31)]
+        with self.assertRaises(executor.TooManyTasks):
+            for task in tasks:
+                self.executor.dispatch(task)
+
+    @slowtest
+    def test_concurrency(self):
+        tasks = [Task(wait=0.1) for n in xrange(20)]
+        for task in tasks:
+            self.executor.dispatch(task, 1.0)
+        time.sleep(0.3)
+        for task in tasks:
+            self.assertTrue(task.executed.is_set())
+
+    @slowtest
+    def test_blocked_workers(self):
+        slow_tasks = [Task(wait=0.4) for n in xrange(5)]
+        for task in slow_tasks:
+            self.executor.dispatch(task, 1.0)
+        # Slow tasks block half of the workers
+        tasks = [Task(wait=0.1) for n in xrange(20)]
+        for task in tasks:
+            self.executor.dispatch(task, 1.0)
+        time.sleep(0.5)
+        for task in tasks:
+            self.assertTrue(task.executed.is_set())
+        for task in slow_tasks:
+            self.assertTrue(task.executed.is_set())
+
+    @slowtest
+    def test_discarded_workers(self):
+        slow_tasks = [Task(wait=0.4) for n in xrange(10)]
+        for task in slow_tasks:
+            self.executor.dispatch(task, 0.1)
+        # All workers are blocked on slow tasks
+        time.sleep(0.1)
+        # Blocked workers should be replaced with new workers
+        tasks = [Task(wait=0.1) for n in xrange(20)]
+        for task in tasks:
+            self.executor.dispatch(task, 1.0)
+        time.sleep(0.3)
+        for task in tasks:
+            self.assertTrue(task.executed.is_set())
+        for task in slow_tasks:
+            self.assertTrue(task.executed.is_set())
+        # Discarded workers should exit, executor should operate normally
+        tasks = [Task(wait=0.1) for n in xrange(20)]
+        for task in tasks:
+            self.executor.dispatch(task, 1.0)
+        time.sleep(0.3)
+        for task in tasks:
+            self.assertTrue(task.executed.is_set())
+
+
+class Task(object):
+
+    def __init__(self, wait=None, error=None):
+        self.wait = wait
+        self.error = error
+        self.executed = threading.Event()
+
+    def __call__(self):
+        if self.wait:
+            time.sleep(self.wait)
+        self.executed.set()
+        if self.error:
+            raise self.error
diff --git a/vdsm.spec.in b/vdsm.spec.in
index bdd8bc9..2502db4 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1269,6 +1269,7 @@
 %{python_sitelib}/%{vdsm_name}/constants.py*
 %{python_sitelib}/%{vdsm_name}/define.py*
 %{python_sitelib}/%{vdsm_name}/exception.py*
+%{python_sitelib}/%{vdsm_name}/executor.py*
 %{python_sitelib}/%{vdsm_name}/ipwrapper.py*
 %{python_sitelib}/%{vdsm_name}/libvirtconnection.py*
 %{python_sitelib}/%{vdsm_name}/netinfo.py*


-- 
To view, visit https://gerrit.ovirt.org/48196
To unsubscribe, visit https://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 2
Gerrit-Project: vdsm
Gerrit-Branch: ovirt-3.5
Gerrit-Owner: Piotr Kliczewski <piotr.kliczew...@gmail.com>
Gerrit-Reviewer: Dan Kenigsberg <dan...@redhat.com>
Gerrit-Reviewer: Francesco Romani <from...@redhat.com>
Gerrit-Reviewer: automat...@ovirt.org
_______________________________________________
vdsm-patches mailing list
vdsm-patches@lists.fedorahosted.org
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to