Nir Soffer has uploaded a new change for review.

Change subject: executor: Replace _ExecutorTask with _Worker
......................................................................

executor: Replace _ExecutorTask with _Worker

Previously we would create an _ExecutorTask object for each task. This
made sense in an earlier version where this object was returned to the
caller, and the caller was using the object for discarding the task.

This patch replace _ExecutorTask with _Worker. Instead of creating one
object per task, we create few long living _Worker objects, and keep
task info in tuples, which are more lightweight.

The _Worker object keep the code that was in _ExecutorTask and some code
that was in the Executor class, which did too many things before.

A new _WorkerBusy exception is raise now when joining a worker, making
joining less racy and more safe, as you cannot join now a busy worker by
forgetting to check the busy property.

This also fix a bug when calling the scheduler with the wrong order of
parameters, which was hidden by the fake scheduler that used the same
wrong order.

Change-Id: I41760e33a4f48f84421969ac847ea5749192fd3d
Signed-off-by: Nir Soffer <[email protected]>
---
M lib/vdsm/executor.py
M tests/executorTests.py
2 files changed, 88 insertions(+), 70 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/44/30744/1

diff --git a/lib/vdsm/executor.py b/lib/vdsm/executor.py
index 547f8b4..2e7b0b1 100644
--- a/lib/vdsm/executor.py
+++ b/lib/vdsm/executor.py
@@ -54,7 +54,6 @@
         self._cond = threading.Condition(threading.Lock())
         self._running = False
         self._scheduler = scheduler
-        self._worker_id = 0
 
     def start(self):
         self._log.debug('Starting executor')
@@ -73,9 +72,10 @@
             workers = tuple(self._workers) if wait else ()
             self._cond.notify_all()  # wake up all the workers
         for worker in workers:
-            if not worker.busy:
-                self._log.debug('Waiting for worker %s', worker.name)
+            try:
                 worker.join()
+            except _WorkerBusy:
+                self._log.debug('Skipping busy worker %s', worker.name)
 
     def dispatch(self, callable, timeout=None):
         """dispatches a new task to the executor.
@@ -83,56 +83,35 @@
         The task may be any callable.
         The task will be executed as soon as possible
         in one of the active workers of the executor."""
-        task = _ExecutorTask(callable, timeout, self._scheduler,
-                             self._task_discarded)
         with self._cond:
             if not self._running:
                 raise NotRunning()
-            self._tasks.put(task)
+            self._tasks.put((callable, timeout))
             self._cond.notify()  # wake up one worker to handle this task
 
-    # Managing workers
+    # Serving workers
 
-    def _add_worker(self):
-        self._worker_id += 1
-        worker = threading.Thread(target=self._run,
-                                  name='Executor-%i' % self._worker_id)
-        worker.daemon = True
-        worker.busy = False
-        self._log.debug('Starting worker %s' % worker.name)
-        worker.start()
-        self._workers.add(worker)
-        self._workers.add(worker)
-
-    def _task_discarded(self):
+    def _worker_discarded(self, worker):
+        """
+        Called from shceduler thread when worker was discarded. The worker
+        thread is blocked on a task, and will exit when the task finish.
+        """
         with self._cond:
             if self._running:
                 self._add_worker()
 
-    @utils.traceback(on=_log.name)
-    def _run(self):
-        worker = threading.current_thread()
-        self._log.debug('Worker %s started' % worker.name)
-        try:
-            self._loop()
-        except NotRunning:
-            self._log.debug('Worker %s stopped' % worker.name)
-        except _TaskDiscarded:
-            self._log.debug('Worker %s was discarded' % worker.name)
-        finally:
-            with self._cond:
-                self._workers.remove(worker)
-
-    def _loop(self):
-        while True:
-            task = self._next_task()
-            self.busy = True
-            try:
-                task.execute()
-            finally:
-                self.busy = False
+    def _worker_stopped(self, worker):
+        """
+        Called from worker thread before it exit.
+        """
+        with self._cond:
+            self._workers.remove(worker)
 
     def _next_task(self):
+        """
+        Called from worker thread to get the next task from the taks queue.
+        Raises NotRunning exception if executor was stopped.
+        """
         with self._cond:
             while self._running and self._tasks.empty:
                 self._cond.wait()
@@ -140,55 +119,94 @@
                 raise NotRunning()
             return self._tasks.get()
 
+    # Private
 
-class _TaskDiscarded(Exception):
-    pass
+    def _add_worker(self):
+        worker = _Worker(self, self._scheduler)
+        self._workers.add(worker)
 
 
-class _ExecutorTask(object):
+class _WorkerBusy(Exception):
+    """ Raised when trying to join a busy worker """
 
-    _log = logging.getLogger('Executor.Task')
 
-    def __init__(self, callable, timeout, scheduler, on_task_discarded):
-        self._callable = callable
-        self._timeout = timeout
+class _WorkerDiscarded(Exception):
+    """ Raised if worker was discarded during execution of a task """
+
+
+class _Worker(object):
+
+    _log = logging.getLogger('Executor')
+    _id = 0
+
+    def __init__(self, executor, scheduler):
+        self._executor = executor
         self._scheduler = scheduler
-        self._on_task_discarded = on_task_discarded
         self._lock = threading.Lock()
-        self._executing = False
+        self._busy = False
         self._discarded = False
+        _Worker._id += 1
+        name = "Executor-%i" % _Worker._id
+        self._thread = threading.Thread(target=self._run, name=name)
+        self._thread.daemon = True
+        self._log.debug('Starting worker %s' % name)
+        self._thread.start()
 
-    def execute(self):
-        """called by the worker thread in Executor to perform the task."""
+    @property
+    def name(self):
+        return self._thread.name
 
+    def join(self):
         with self._lock:
-            if self._executing:
-                raise AssertionError("Attempt to execute twice task %s",
-                                     self._callable)
-            self._executing = True
-            if self._timeout is not None:
-                self._scheduler.schedule(self._discard, self._timeout)
+            if self._busy:
+                raise _WorkerBusy()
+        self._log.debug('Waiting for worker %s', self.name)
+        self._thread.join()
+
+    @utils.traceback(on=_log.name)
+    def _run(self):
+        self._log.debug('Worker started')
         try:
-            self._callable()
+            while True:
+                self._execute_task()
+        except NotRunning:
+            self._log.debug('Worker stopped')
+        except _WorkerDiscarded:
+            self._log.debug('Worker was discarded')
+        finally:
+            self._executor._worker_stopped(self)
+
+    def _execute_task(self):
+        callable, timeout = self._executor._next_task()
+        discard = self._discard_after(timeout)
+        with self._lock:
+            self._busy = True
+        try:
+            callable()
         except Exception:
-            self._log.exception("Unhandled exception in %s", self._callable)
+            self._log.exception("Unhandled exception in %s", callable)
         finally:
             with self._lock:
-                self._scheduler = None
-                self._callable = None
-                self._on_task_discarded = None
-                self._executing = False
+                self._busy = False
                 if self._discarded:
-                    raise _TaskDiscarded()
+                    raise _WorkerDiscarded()
+            if discard is not None:
+                discard.cancel()
+
+    def _discard_after(self, timeout):
+        if timeout is not None:
+            return self._scheduler.schedule(timeout, self._discard)
+        return None
 
     def _discard(self):
         with self._lock:
-            if not self._executing or self._discarded:
+            if self._discarded:
+                raise AssertionError("Attempt to discard worker twice")
+            if not self._busy:
                 return
-            self._log.debug("ExecutorTask %s was discarded", self._callable)
+            self._log.debug("Discarding worker %s", self.name)
             self._discarded = True
-            on_task_discarded = self._on_task_discarded
-        on_task_discarded()
+        self._executor._worker_discarded(self)
 
 
 class _Queue(object):
diff --git a/tests/executorTests.py b/tests/executorTests.py
index 0d332ba..37a5cb5 100644
--- a/tests/executorTests.py
+++ b/tests/executorTests.py
@@ -76,6 +76,6 @@
     def __init__(self):
         self._timer = None
 
-    def schedule(self, callable, timeout):
+    def schedule(self, timeout, callable):
         self._timer = threading.Timer(timeout, callable)
         self._timer.start()


-- 
To view, visit http://gerrit.ovirt.org/30744
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I41760e33a4f48f84421969ac847ea5749192fd3d
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Nir Soffer <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to