Nir Soffer has posted comments on this change. Change subject: executor: introduce the executor library ......................................................................
Patch Set 11: (21 comments) I think that this is the right direction. To discard tasks, the code which submit the should schedule a task.discard() call using the scheduler. The calling code should be able to tell the right timeout for this. There are various issues in the implementation. See the inline comments. To shorten the review cycle, I posted a modified version that show how all the issue can be solved: http://pastebin.com/xevn4b28 http://gerrit.ovirt.org/#/c/29191/11/lib/vdsm/executor.py File lib/vdsm/executor.py: Line 28: import logging Line 29: import Queue Line 30: import threading Line 31: Line 32: import utils We can use now relative imports: from . import utils Line 33: Line 34: Line 35: class ExecutorNotRunning(Exception): Line 36: """ Line 31: Line 32: import utils Line 33: Line 34: Line 35: class ExecutorNotRunning(Exception): "executor.ExecutorNotRunning" repeat the "Executor" word for no reason. We have namespaces in Python, so there is no need to repeat stuff. Line 36: """ Line 37: Executor not yet started or shutting down. Line 38: """ Line 39: Line 37: Executor not yet started or shutting down. Line 38: """ Line 39: Line 40: Line 41: class ExecutorAlreadyStarted(Exception): "executor.ExecutorAllreadyStarted" repeat the "Executor" word for no reason. Line 42: """ Line 43: Executor started multiple times. Line 44: """ Line 45: Line 51: """ Line 52: Line 53: _log = logging.getLogger("Executor") Line 54: Line 55: def __init__(self, workers=10, max_tasks=1000): I don't think we need defaults, since we don't know how the executor is going to be used. The caller must specify this. Line 56: self._workers_count = workers Line 57: self._tasks = Queue.Queue(max_tasks) Line 58: self._workers = [] Line 59: self._lock = threading.Lock() Line 54: Line 55: def __init__(self, workers=10, max_tasks=1000): Line 56: self._workers_count = workers Line 57: self._tasks = Queue.Queue(max_tasks) Line 58: self._workers = [] Lets keep workers in a set(), make it easier to remove them when needed. Line 59: self._lock = threading.Lock() Line 60: self._running = False Line 61: self._discards = 0 # for debug purposes only Line 62: Line 57: self._tasks = Queue.Queue(max_tasks) Line 58: self._workers = [] Line 59: self._lock = threading.Lock() Line 60: self._running = False Line 61: self._discards = 0 # for debug purposes only I don't think we need this, logging each discard operation is enough. Line 62: Line 63: def start(self): Line 64: with self._lock: Line 65: if self._running: Line 65: if self._running: Line 66: raise ExecutorAlreadyStarted Line 67: for _ in range(self._workers_count): Line 68: self._add_worker() Line 69: self._running = True Lets put self._running before starting the threads, which may use this value. Line 70: Line 71: def stop(self, wait=True): Line 72: for worker in self._workers: Line 73: self._tasks.put(_STOP) Line 69: self._running = True Line 70: Line 71: def stop(self, wait=True): Line 72: for worker in self._workers: Line 73: self._tasks.put(_STOP) This may block if tasks queue is full, so this is not a good way to stop the threads. This will all threads *after* pending tasks are executed, which can cause threads to stuck, and this method will never return when using wait=True. I think that we like to stop all waiting threads *now*, and drop pending tasks. To do this, we need to clear the queue, and signal all threads to exit. This is not possible when using the builtin Queue. Queue contains many features which we don't need (task_done, join), but not what we need (clear). Line 74: with self._lock: Line 75: self._running = False Line 76: if wait: Line 77: for worker in self._workers: Line 72: for worker in self._workers: Line 73: self._tasks.put(_STOP) Line 74: with self._lock: Line 75: self._running = False Line 76: if wait: This will deadlock because you hold the lock when exiting threads may try to acquire it. Also this wait is too dangerous, since a thread which is running a task may get stuck, and thread which is already running a task may (practically) never exit. We like to wait only for threads which are not executing any task. Line 77: for worker in self._workers: Line 78: self._log.debug('worker %s stopped', worker.name) Line 79: worker.join() Line 80: del worker Line 94: with self._lock: Line 95: if not self._running: Line 96: raise ExecutorNotRunning Line 97: task = BackgroundTask(task, self) Line 98: self._tasks.put(task) This may raise as you released the lock above, so another thread may have just stopped the executor. Also, adding task may block if the queue is full. This breaks other code that assume that dispatch is async - we never want to do this. We should use here put_nowait() to ensure that this never blocks. The caller of dispatch should handle the Queue.Full exception that this raises. Line 99: return task Line 100: Line 101: @utils.traceback() Line 102: def _run(self): Line 108: try: Line 109: task.execute() Line 110: except Exception: Line 111: self._log.exception('Unhandled exception for task') Line 112: if task.discarded: This can be simplified by raising in task.execute() if task was discarded. Line 113: self._log.debug('Worker stopping after task was discarded') Line 114: break Line 115: self._log.info('worker stopped') Line 116: if task.discarded: Line 112: if task.discarded: Line 113: self._log.debug('Worker stopping after task was discarded') Line 114: break Line 115: self._log.info('worker stopped') Line 116: if task.discarded: We don't need this check, stop does not remove workers from the workers list. Line 117: self._remove_worker(threading.current_thread()) Line 118: # else no action needed: stop(wait=True) will take care. Line 119: Line 120: def _remove_worker(self, worker): Line 122: self._workers.remove(worker) Line 123: self._log.debug('worker %s left the executor', worker.name) Line 124: Line 125: def _add_worker(self): Line 126: worker = threading.Thread(target=self._run) Name this thread - to make it easy to understand the logs, I wouuld keep a counter and call the threads "Executor-N" Line 127: worker.daemon = True Line 128: worker.start() Line 129: self._workers.append(worker) Line 130: self._log.debug('worker %s joined the executor', worker.name) Line 131: Line 132: def _replace_worker(self): Line 133: with self._lock: Line 134: self._discards += 1 Line 135: self._add_worker() This should not add a worker if executor was stopped. Line 136: Line 137: Line 138: class BackgroundTask(object): Line 139: """ Line 134: self._discards += 1 Line 135: self._add_worker() Line 136: Line 137: Line 138: class BackgroundTask(object): More specific name would be ExecutorTask. Line 139: """ Line 140: Represents a task submitted to the Executor. Line 141: The client can Line 142: - verify if a task is in progress or not Line 147: """ Line 148: def __init__(self, task, executor): Line 149: self._task = task Line 150: self._executor = executor Line 151: self._thread = None We don't need the actual thread, keeping a boolean is enough. Line 152: self._discarded = False Line 153: Line 154: @property Line 155: def discarded(self): Line 159: def in_progress(self): Line 160: """ Line 161: returns True if this task is in progress, False otherwise. Line 162: """ Line 163: return self._thread is not None This may be a task waiting on the queue because all workers are busy, or a task that was already executed. Why do we need this racy check? Line 164: Line 165: def discard(self): Line 166: """ Line 167: This is to be called once the task is detected as stuck. Line 172: It is not possible to interrupt a stuck task, thus Line 173: any side effect will still take place. Line 174: """ Line 175: self._discarded = True Line 176: self._executor._replace_worker() This is racy - if a thread was just executing this task, discarding may be too late, so the thread will continue, and we will add another thread to the pool for no reason. For example: 1. thread 1 executing task 2. thread 2 call discard 3. thread 1 finish task 4. thread 2 set self._discard and invoke _replace_worker Also, calling an executor protected method is little ugly. Since we don't care about the executor object, but only about _replace_worker, we can accept the method instead of the object. Finally, discarding same task twice will add 2 new workers instead of one. Line 177: Line 178: # task interface Line 179: Line 180: def execute(self): Line 176: self._executor._replace_worker() Line 177: Line 178: # task interface Line 179: Line 180: def execute(self): Raising if this task is already executing will be nice - this is impossible and should raise AssertionError. Line 181: self._thread = threading.current_thread() Line 182: try: Line 183: self._task() Line 184: finally: Line 181: self._thread = threading.current_thread() Line 182: try: Line 183: self._task() Line 184: finally: Line 185: self._thread = None This must release the reference to the task, since this reference prevent garbage collection of the task until the background task is collected. Also keeping reference to the executor make it harder for Python to collect the executor (executor -> tasks -> task -> executor). We can simplify the worker thread logic by raising here if task was discarded while executing. Line 186: Line 187: Line 184: finally: Line 185: self._thread = None Line 186: Line 187: Line 188: _STOP = BackgroundTask(None, None) This should be object() - nobody get this back and expect the BackgroundTask interface, and we want it to break if someone try to execute it. -- To view, visit http://gerrit.ovirt.org/29191 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Gerrit-PatchSet: 11 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]> Gerrit-Reviewer: Dan Kenigsberg <[email protected]> Gerrit-Reviewer: Dima Kuznetsov <[email protected]> Gerrit-Reviewer: Federico Simoncelli <[email protected]> Gerrit-Reviewer: Francesco Romani <[email protected]> Gerrit-Reviewer: Michal Skrivanek <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Vinzenz Feenstra <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
