Nir Soffer has uploaded a new change for review. Change subject: executor: Replace _ExecutorTask with _Worker ......................................................................
executor: Replace _ExecutorTask with _Worker Previously we would create an _ExecutorTask object for each task. This made sense in an earlier version where this object was returned to the caller, and the caller was using the object for discarding the task. This patch replace _ExecutorTask with _Worker. Instead of creating one object per task, we create few long living _Worker objects, and keep task info in tuples, which are more lightweight. The _Worker object keep the code that was in _ExecutorTask and some code that was in the Executor class, which did too many things before. A new _WorkerBusy exception is raise now when joining a worker, making joining less racy and more safe, as you cannot join now a busy worker by forgetting to check the busy property. This also fix a bug when calling the scheduler with the wrong order of parameters, which was hidden by the fake scheduler that used the same wrong order. Change-Id: I41760e33a4f48f84421969ac847ea5749192fd3d Signed-off-by: Nir Soffer <[email protected]> --- M lib/vdsm/executor.py M tests/executorTests.py 2 files changed, 88 insertions(+), 70 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/44/30744/1 diff --git a/lib/vdsm/executor.py b/lib/vdsm/executor.py index 547f8b4..2e7b0b1 100644 --- a/lib/vdsm/executor.py +++ b/lib/vdsm/executor.py @@ -54,7 +54,6 @@ self._cond = threading.Condition(threading.Lock()) self._running = False self._scheduler = scheduler - self._worker_id = 0 def start(self): self._log.debug('Starting executor') @@ -73,9 +72,10 @@ workers = tuple(self._workers) if wait else () self._cond.notify_all() # wake up all the workers for worker in workers: - if not worker.busy: - self._log.debug('Waiting for worker %s', worker.name) + try: worker.join() + except _WorkerBusy: + self._log.debug('Skipping busy worker %s', worker.name) def dispatch(self, callable, timeout=None): """dispatches a new task to the executor. @@ -83,56 +83,35 @@ The task may be any callable. The task will be executed as soon as possible in one of the active workers of the executor.""" - task = _ExecutorTask(callable, timeout, self._scheduler, - self._task_discarded) with self._cond: if not self._running: raise NotRunning() - self._tasks.put(task) + self._tasks.put((callable, timeout)) self._cond.notify() # wake up one worker to handle this task - # Managing workers + # Serving workers - def _add_worker(self): - self._worker_id += 1 - worker = threading.Thread(target=self._run, - name='Executor-%i' % self._worker_id) - worker.daemon = True - worker.busy = False - self._log.debug('Starting worker %s' % worker.name) - worker.start() - self._workers.add(worker) - self._workers.add(worker) - - def _task_discarded(self): + def _worker_discarded(self, worker): + """ + Called from shceduler thread when worker was discarded. The worker + thread is blocked on a task, and will exit when the task finish. + """ with self._cond: if self._running: self._add_worker() - @utils.traceback(on=_log.name) - def _run(self): - worker = threading.current_thread() - self._log.debug('Worker %s started' % worker.name) - try: - self._loop() - except NotRunning: - self._log.debug('Worker %s stopped' % worker.name) - except _TaskDiscarded: - self._log.debug('Worker %s was discarded' % worker.name) - finally: - with self._cond: - self._workers.remove(worker) - - def _loop(self): - while True: - task = self._next_task() - self.busy = True - try: - task.execute() - finally: - self.busy = False + def _worker_stopped(self, worker): + """ + Called from worker thread before it exit. + """ + with self._cond: + self._workers.remove(worker) def _next_task(self): + """ + Called from worker thread to get the next task from the taks queue. + Raises NotRunning exception if executor was stopped. + """ with self._cond: while self._running and self._tasks.empty: self._cond.wait() @@ -140,55 +119,94 @@ raise NotRunning() return self._tasks.get() + # Private -class _TaskDiscarded(Exception): - pass + def _add_worker(self): + worker = _Worker(self, self._scheduler) + self._workers.add(worker) -class _ExecutorTask(object): +class _WorkerBusy(Exception): + """ Raised when trying to join a busy worker """ - _log = logging.getLogger('Executor.Task') - def __init__(self, callable, timeout, scheduler, on_task_discarded): - self._callable = callable - self._timeout = timeout +class _WorkerDiscarded(Exception): + """ Raised if worker was discarded during execution of a task """ + + +class _Worker(object): + + _log = logging.getLogger('Executor') + _id = 0 + + def __init__(self, executor, scheduler): + self._executor = executor self._scheduler = scheduler - self._on_task_discarded = on_task_discarded self._lock = threading.Lock() - self._executing = False + self._busy = False self._discarded = False + _Worker._id += 1 + name = "Executor-%i" % _Worker._id + self._thread = threading.Thread(target=self._run, name=name) + self._thread.daemon = True + self._log.debug('Starting worker %s' % name) + self._thread.start() - def execute(self): - """called by the worker thread in Executor to perform the task.""" + @property + def name(self): + return self._thread.name + def join(self): with self._lock: - if self._executing: - raise AssertionError("Attempt to execute twice task %s", - self._callable) - self._executing = True - if self._timeout is not None: - self._scheduler.schedule(self._discard, self._timeout) + if self._busy: + raise _WorkerBusy() + self._log.debug('Waiting for worker %s', self.name) + self._thread.join() + + @utils.traceback(on=_log.name) + def _run(self): + self._log.debug('Worker started') try: - self._callable() + while True: + self._execute_task() + except NotRunning: + self._log.debug('Worker stopped') + except _WorkerDiscarded: + self._log.debug('Worker was discarded') + finally: + self._executor._worker_stopped(self) + + def _execute_task(self): + callable, timeout = self._executor._next_task() + discard = self._discard_after(timeout) + with self._lock: + self._busy = True + try: + callable() except Exception: - self._log.exception("Unhandled exception in %s", self._callable) + self._log.exception("Unhandled exception in %s", callable) finally: with self._lock: - self._scheduler = None - self._callable = None - self._on_task_discarded = None - self._executing = False + self._busy = False if self._discarded: - raise _TaskDiscarded() + raise _WorkerDiscarded() + if discard is not None: + discard.cancel() + + def _discard_after(self, timeout): + if timeout is not None: + return self._scheduler.schedule(timeout, self._discard) + return None def _discard(self): with self._lock: - if not self._executing or self._discarded: + if self._discarded: + raise AssertionError("Attempt to discard worker twice") + if not self._busy: return - self._log.debug("ExecutorTask %s was discarded", self._callable) + self._log.debug("Discarding worker %s", self.name) self._discarded = True - on_task_discarded = self._on_task_discarded - on_task_discarded() + self._executor._worker_discarded(self) class _Queue(object): diff --git a/tests/executorTests.py b/tests/executorTests.py index 0d332ba..37a5cb5 100644 --- a/tests/executorTests.py +++ b/tests/executorTests.py @@ -76,6 +76,6 @@ def __init__(self): self._timer = None - def schedule(self, callable, timeout): + def schedule(self, timeout, callable): self._timer = threading.Timer(timeout, callable) self._timer.start() -- To view, visit http://gerrit.ovirt.org/30744 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I41760e33a4f48f84421969ac847ea5749192fd3d Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Nir Soffer <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
