Francesco Romani has posted comments on this change. Change subject: executor: introduce the executor library ......................................................................
Patch Set 11: (14 comments) initial response http://gerrit.ovirt.org/#/c/29191/11/lib/vdsm/executor.py File lib/vdsm/executor.py: Line 28: import logging Line 29: import Queue Line 30: import threading Line 31: Line 32: import utils > We can use now relative imports: Done Line 33: Line 34: Line 35: class ExecutorNotRunning(Exception): Line 36: """ Line 31: Line 32: import utils Line 33: Line 34: Line 35: class ExecutorNotRunning(Exception): > "executor.ExecutorNotRunning" repeat the "Executor" word for no reason. We Sure. removed the repetition. Line 36: """ Line 37: Executor not yet started or shutting down. Line 38: """ Line 39: Line 37: Executor not yet started or shutting down. Line 38: """ Line 39: Line 40: Line 41: class ExecutorAlreadyStarted(Exception): > "executor.ExecutorAllreadyStarted" repeat the "Executor" word for no reason As above Line 42: """ Line 43: Executor started multiple times. Line 44: """ Line 45: Line 51: """ Line 52: Line 53: _log = logging.getLogger("Executor") Line 54: Line 55: def __init__(self, workers=10, max_tasks=1000): > I don't think we need defaults, since we don't know how the executor is goi Good point, removed. Line 56: self._workers_count = workers Line 57: self._tasks = Queue.Queue(max_tasks) Line 58: self._workers = [] Line 59: self._lock = threading.Lock() Line 54: Line 55: def __init__(self, workers=10, max_tasks=1000): Line 56: self._workers_count = workers Line 57: self._tasks = Queue.Queue(max_tasks) Line 58: self._workers = [] > Lets keep workers in a set(), make it easier to remove them when needed. Done Line 59: self._lock = threading.Lock() Line 60: self._running = False Line 61: self._discards = 0 # for debug purposes only Line 62: Line 65: if self._running: Line 66: raise ExecutorAlreadyStarted Line 67: for _ in range(self._workers_count): Line 68: self._add_worker() Line 69: self._running = True > Lets put self._running before starting the threads, which may use this valu Done Line 70: Line 71: def stop(self, wait=True): Line 72: for worker in self._workers: Line 73: self._tasks.put(_STOP) Line 69: self._running = True Line 70: Line 71: def stop(self, wait=True): Line 72: for worker in self._workers: Line 73: self._tasks.put(_STOP) > This may block if tasks queue is full, so this is not a good way to stop th Agreed. Will work on this. Line 74: with self._lock: Line 75: self._running = False Line 76: if wait: Line 77: for worker in self._workers: Line 72: for worker in self._workers: Line 73: self._tasks.put(_STOP) Line 74: with self._lock: Line 75: self._running = False Line 76: if wait: > This will deadlock because you hold the lock when exiting threads may try t Will introduce a per-thread 'discarded' flag and will wait only for not-discarded threads as you suggested in a past comment. Will also rearchitect the stop() to overcome this dangerous behaviour. Line 77: for worker in self._workers: Line 78: self._log.debug('worker %s stopped', worker.name) Line 79: worker.join() Line 80: del worker Line 94: with self._lock: Line 95: if not self._running: Line 96: raise ExecutorNotRunning Line 97: task = BackgroundTask(task, self) Line 98: self._tasks.put(task) > This may raise as you released the lock above, so another thread may have j Will move inside the lock and switch to put_nowait Line 99: return task Line 100: Line 101: @utils.traceback() Line 102: def _run(self): Line 108: try: Line 109: task.execute() Line 110: except Exception: Line 111: self._log.exception('Unhandled exception for task') Line 112: if task.discarded: > This can be simplified by raising in task.execute() if task was discarded. Good point, will change. Line 113: self._log.debug('Worker stopping after task was discarded') Line 114: break Line 115: self._log.info('worker stopped') Line 116: if task.discarded: Line 122: self._workers.remove(worker) Line 123: self._log.debug('worker %s left the executor', worker.name) Line 124: Line 125: def _add_worker(self): Line 126: worker = threading.Thread(target=self._run) > Name this thread - to make it easy to understand the logs, I wouuld keep a Done Line 127: worker.daemon = True Line 128: worker.start() Line 129: self._workers.append(worker) Line 130: self._log.debug('worker %s joined the executor', worker.name) Line 134: self._discards += 1 Line 135: self._add_worker() Line 136: Line 137: Line 138: class BackgroundTask(object): > More specific name would be ExecutorTask. Done Line 139: """ Line 140: Represents a task submitted to the Executor. Line 141: The client can Line 142: - verify if a task is in progress or not Line 159: def in_progress(self): Line 160: """ Line 161: returns True if this task is in progress, False otherwise. Line 162: """ Line 163: return self._thread is not None > This may be a task waiting on the queue because all workers are busy, or a No immediate need, will remove Line 164: Line 165: def discard(self): Line 166: """ Line 167: This is to be called once the task is detected as stuck. Line 184: finally: Line 185: self._thread = None Line 186: Line 187: Line 188: _STOP = BackgroundTask(None, None) > This should be object() - nobody get this back and expect the BackgroundTas Done -- To view, visit http://gerrit.ovirt.org/29191 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Gerrit-PatchSet: 11 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]> Gerrit-Reviewer: Dan Kenigsberg <[email protected]> Gerrit-Reviewer: Dima Kuznetsov <[email protected]> Gerrit-Reviewer: Federico Simoncelli <[email protected]> Gerrit-Reviewer: Francesco Romani <[email protected]> Gerrit-Reviewer: Michal Skrivanek <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Vinzenz Feenstra <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
