Nir Soffer has uploaded a new change for review. Change subject: executor: Optimize locking ......................................................................
executor: Optimize locking Profiles show that locking is our biggest overhead in the executor. This patch optimize locking in the executor: - Remove the _Worker lock, it should work correctly without a lock. - Separate the Executor and _TaskQueue lockinng. The Eecutor uses now a lock, use to start, stop and resize the exectutor. These operations are rare. - Move the Condition into the _TaskQueue, and use it only for waiting when the deque is empty. deque is thread safe providing fast atomic append and popleft methods. Change-Id: I1df60abe8c85b57030494a5c031a0db35e6d8354 Signed-off-by: Nir Soffer <[email protected]> --- M lib/vdsm/executor.py 1 file changed, 46 insertions(+), 41 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/86/30986/1 diff --git a/lib/vdsm/executor.py b/lib/vdsm/executor.py index 64384f3..02a80b2 100644 --- a/lib/vdsm/executor.py +++ b/lib/vdsm/executor.py @@ -52,12 +52,12 @@ self._tasks = _TaskQueue(max_tasks) self._scheduler = scheduler self._workers = set() - self._cond = threading.Condition(threading.Lock()) + self._lock = threading.Lock() self._running = False def start(self): self._log.debug('Starting executor') - with self._cond: + with self._lock: if self._running: raise AlreadyStarted() self._running = True @@ -66,11 +66,12 @@ def stop(self, wait=True): self._log.debug('Stopping executor') - with self._cond: + with self._lock: self._running = False self._tasks.clear() + for i in range(self._workers_count): + self._tasks.put((_STOP, 0)) workers = tuple(self._workers) if wait else () - self._cond.notify_all() # wake up all the workers for worker in workers: try: worker.join() @@ -83,11 +84,9 @@ The task may be any callable. The task will be executed as soon as possible in one of the active workers of the executor.""" - with self._cond: - if not self._running: - raise NotRunning() - self._tasks.put((callable, timeout)) - self._cond.notify() # wake up one worker to handle this task + if not self._running: + raise NotRunning() + self._tasks.put((callable, timeout)) # Serving workers @@ -96,7 +95,7 @@ Called from shceduler thread when worker was discarded. The worker thread is blocked on a task, and will exit when the task finish. """ - with self._cond: + with self._lock: if self._running: self._add_worker() @@ -104,7 +103,7 @@ """ Called from worker thread before it exit. """ - with self._cond: + with self._lock: self._workers.remove(worker) def _next_task(self): @@ -112,18 +111,19 @@ Called from worker thread to get the next task from the taks queue. Raises NotRunning exception if executor was stopped. """ - with self._cond: - while self._running and self._tasks.empty(): - self._cond.wait() - if not self._running: - raise NotRunning() - return self._tasks.get() + task, timeout = self._tasks.get() + if task is _STOP: + raise NotRunning() + return task, timeout # Private def _add_worker(self): worker = _Worker(self, self._scheduler) self._workers.add(worker) + + +_STOP = object() class _WorkerBusy(Exception): @@ -142,7 +142,6 @@ def __init__(self, executor, scheduler): self._executor = executor self._scheduler = scheduler - self._lock = threading.Lock() self._busy = False self._discarded = False _Worker._id += 1 @@ -157,9 +156,8 @@ return self._thread.name def join(self): - with self._lock: - if self._busy: - raise _WorkerBusy() + if self._busy: + raise _WorkerBusy() self._log.debug('Waiting for worker %s', self.name) self._thread.join() @@ -179,17 +177,15 @@ def _execute_task(self): callable, timeout = self._executor._next_task() discard = self._discard_after(timeout) - with self._lock: - self._busy = True + self._busy = True try: callable() except Exception: self._log.exception("Unhandled exception in %s", callable) finally: - with self._lock: - self._busy = False - if self._discarded: - raise _WorkerDiscarded() + self._busy = False + if self._discarded: + raise _WorkerDiscarded() if discard is not None: discard.cancel() @@ -199,34 +195,43 @@ return None def _discard(self): - with self._lock: - if self._discarded: - raise AssertionError("Attempt to discard worker twice") - if not self._busy: - return - self._log.debug("Discarding worker %s", self.name) - self._discarded = True + if self._discarded: + raise AssertionError("Attempt to discard worker twice") + if not self._busy: + return + self._discarded = True + self._log.debug("Worker %s discarded", self.name) self._executor._worker_discarded(self) class _TaskQueue(object): - """Queue.Queue doesn't support the clear() operation. - All the locking must be provided by the caller.""" + """ Queue.Queue doesn't support the clear() operation. """ def __init__(self, max_tasks): self._max_tasks = max_tasks self._tasks = collections.deque() - - def empty(self): - return len(self._tasks) == 0 + # deque is thread safe - we can append and pop from both ends without + # additioanl locking. We need this condition only for waiting. + self._cond = threading.Condition(threading.Lock()) + self._waiters = 0 def put(self, task): if len(self._tasks) == self._max_tasks: raise TooManyTasks() - self._tasks.appendleft(task) + self._tasks.append(task) + if self._waiters > 0: + with self._cond: + self._cond.notify() def get(self): - return self._tasks.pop() + while True: + try: + return self._tasks.popleft() + except IndexError: + with self._cond: + self._waiters += 1 + self._cond.wait() + self._waiters -= 1 def clear(self): self._tasks.clear() -- To view, visit http://gerrit.ovirt.org/30986 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I1df60abe8c85b57030494a5c031a0db35e6d8354 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Nir Soffer <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
