Francesco Romani has posted comments on this change. Change subject: executor: introduce the executor library ......................................................................
Patch Set 10: (25 comments) http://gerrit.ovirt.org/#/c/29191/10/lib/vdsm/executor.py File lib/vdsm/executor.py: Line 36: pass Line 37: Line 38: Line 39: def _COLLECTOR(result): Line 40: pass > Can you move private stuff like this to the end of the file? It is easier t Sure, will have the public interface first. Line 41: Line 42: Line 43: class NoAvailableWorker(Exception): Line 44: """ Line 45: No available worker to carry the task. Line 46: """ Line 47: Line 48: Line 49: class PoolNotRunning(Exception): > Lets go with your choice and name it ExecutorNotRunning Done Line 50: """ Line 51: Pool not yet started or shutting down. Line 52: """ Line 53: Line 59: """ Line 60: Line 61: _log = logging.getLogger("Executor") Line 62: Line 63: def __init__(self, num_workers=10): > I think that "workers" or "size" is much more readable then the cryptic num Picked 'workers' Line 64: self._num_workers = num_workers Line 65: self._tasks = Queue.Queue() Line 66: self._workers = [] Line 67: self._resize_lock = threading.Lock() Line 61: _log = logging.getLogger("Executor") Line 62: Line 63: def __init__(self, num_workers=10): Line 64: self._num_workers = num_workers Line 65: self._tasks = Queue.Queue() > We need to limit the size - you don't want to hold here 1000's of tasks. Done Line 66: self._workers = [] Line 67: self._resize_lock = threading.Lock() Line 68: self._running = False Line 69: self._evictions = 0 # for debug purposes only Line 63: def __init__(self, num_workers=10): Line 64: self._num_workers = num_workers Line 65: self._tasks = Queue.Queue() Line 66: self._workers = [] Line 67: self._resize_lock = threading.Lock() > I would call this _lock, since you need to protect more then the workers li Done Line 68: self._running = False Line 69: self._evictions = 0 # for debug purposes only Line 70: Line 71: def start(self): Line 68: self._running = False Line 69: self._evictions = 0 # for debug purposes only Line 70: Line 71: def start(self): Line 72: with self._resize_lock: > You should raise if executor is started multiple times. Done Line 73: for _ in range(self._num_workers): Line 74: self._add_worker() Line 75: self._running = True Line 76: Line 74: self._add_worker() Line 75: self._running = True Line 76: Line 77: def stop(self): Line 78: self._running = False > Must be inside the lock. Done Line 79: for worker in self._workers: Line 80: self._tasks.put((_STOP, (), {})) Line 81: with self._resize_lock: Line 82: for worker in self._workers: Line 79: for worker in self._workers: Line 80: self._tasks.put((_STOP, (), {})) Line 81: with self._resize_lock: Line 82: for worker in self._workers: Line 83: self._log.debug('terminating worker: %s', worker.name) > This is not correct, you are not terminating the worker here. Maybe: ("Work Picked 'Worker %s stopped' Line 84: worker.join() Line 85: del worker Line 86: Line 87: @property Line 80: self._tasks.put((_STOP, (), {})) Line 81: with self._resize_lock: Line 82: for worker in self._workers: Line 83: self._log.debug('terminating worker: %s', worker.name) Line 84: worker.join() > This can take lot of time - should be done only if the caller specified wai Done Line 85: del worker Line 86: Line 87: @property Line 88: def size(self): Line 84: worker.join() Line 85: del worker Line 86: Line 87: @property Line 88: def size(self): > If you go with "size" here, then the __init__ argument should be also "size No immediaten need, then removed. Line 89: """ Line 90: returns the actual size of the worker pool. Line 91: """ Line 92: with self._resize_lock: Line 91: """ Line 92: with self._resize_lock: Line 93: return len(self._workers) Line 94: Line 95: def send(self, task, *args, **kwargs): > send? I don't understand this api. Ok for the naming. Will use 'dispatch'. Agreed for *args and **kwargs: no immediate need, let's start without them. Line 96: """ Line 97: send a new task to the pool. Line 98: Line 99: The task may be any callable. extra arguments are Line 106: See doc(BackgroundTask.discard) Line 107: """ Line 108: return self.submit(_COLLECTOR, task, *args, **kwargs) Line 109: Line 110: def submit(self, collector, task, *args, **kwargs): > submit? I don't understand this api. Well, in retrospect yes, those are ugly names. I needed somehow similar names to dispatch a task with and without a collector callable. But let's just get rid of the collector until (if any) there is objective need for it. Line 111: """ Line 112: submit a task to the pool, with a collector function. Line 113: Line 114: The collector function must be a callable which Line 122: The collector will be called only if the task is not Line 123: discarded. Line 124: See doc(BackgroundTask.discard) Line 125: """ Line 126: if not self._running: > Should be protected by the lock Done Line 127: raise PoolNotRunning Line 128: if not self._workers: Line 129: raise NoAvailableWorker Line 130: task = BackgroundTask(task, self, collector) Line 124: See doc(BackgroundTask.discard) Line 125: """ Line 126: if not self._running: Line 127: raise PoolNotRunning Line 128: if not self._workers: > Must be protected by the lock Done Line 129: raise NoAvailableWorker Line 130: task = BackgroundTask(task, self, collector) Line 131: self._tasks.put((task, args, kwargs)) Line 132: return task Line 128: if not self._workers: Line 129: raise NoAvailableWorker Line 130: task = BackgroundTask(task, self, collector) Line 131: self._tasks.put((task, args, kwargs)) Line 132: return task > Keeping tuples of (task, args, kwargs) means that any object references fro Agreed to get rid of explicit args and kwargs and pack the state in the object Line 133: Line 134: def _run(self): Line 135: self._log.info('worker started') Line 136: while True: Line 129: raise NoAvailableWorker Line 130: task = BackgroundTask(task, self, collector) Line 131: self._tasks.put((task, args, kwargs)) Line 132: return task Line 133: > Must be protected with @traceback Done Line 134: def _run(self): Line 135: self._log.info('worker started') Line 136: while True: Line 137: task, args, kwargs = self._tasks.get() Line 134: def _run(self): Line 135: self._log.info('worker started') Line 136: while True: Line 137: task, args, kwargs = self._tasks.get() Line 138: if task is _STOP: > This should be: Done Line 139: break Line 140: try: Line 141: with running(task): Line 142: task(*args, **kwargs) Line 138: if task is _STOP: Line 139: break Line 140: try: Line 141: with running(task): Line 142: task(*args, **kwargs) > Should be: Done Line 143: except Exception: Line 144: self._log.exception('unhandled exception for task') Line 145: if task.discarded: Line 146: self._log.debug('worker aborting') Line 140: try: Line 141: with running(task): Line 142: task(*args, **kwargs) Line 143: except Exception: Line 144: self._log.exception('unhandled exception for task') > unhandled -> Unhandled Done Line 145: if task.discarded: Line 146: self._log.debug('worker aborting') Line 147: break Line 148: self._log.info('worker stopped') Line 142: task(*args, **kwargs) Line 143: except Exception: Line 144: self._log.exception('unhandled exception for task') Line 145: if task.discarded: Line 146: self._log.debug('worker aborting') > Lets be more clear: "Worker stopping after task was discarded" Done Line 147: break Line 148: self._log.info('worker stopped') Line 149: Line 150: def remove_worker(self, worker): Line 144: self._log.exception('unhandled exception for task') Line 145: if task.discarded: Line 146: self._log.debug('worker aborting') Line 147: break Line 148: self._log.info('worker stopped') > I think we should handle removal of workers here: This will lead to better code, so I will do it. Line 149: Line 150: def remove_worker(self, worker): Line 151: "to be used from BackgroundTask only." Line 152: with self._resize_lock: Line 146: self._log.debug('worker aborting') Line 147: break Line 148: self._log.info('worker stopped') Line 149: Line 150: def remove_worker(self, worker): > This is little ugly - people will use this eventually. Just make this _prot Agreed and done Line 151: "to be used from BackgroundTask only." Line 152: with self._resize_lock: Line 153: if not self._running: Line 154: return # no point in messing up things if shutting down Line 166: worker = threading.Thread(target=self._run) Line 167: worker.daemon = True Line 168: worker.start() Line 169: except Exception: Line 170: self._log.exception('cannot create worker') > Let not handle this here, and let the caller handle this error. Done Line 171: else: Line 172: self._workers.append(worker) Line 173: self._log.debug('worker %s joined the pool', worker.name) Line 174: Line 186: def __init__(self, task, executor, collector): Line 187: self._task = task Line 188: self._executor = executor Line 189: self._collector = collector Line 190: self._valid = True > I don't understand why we need a collector. The task should simply run, if No immediate or obvious need for the collector, just a maybe-nice-to-have feature. Thus removed and simplified the code. Line 191: self._thread = None Line 192: Line 193: @property Line 194: def in_progress(self): Line 217: self._valid = False Line 218: if self.in_progress: Line 219: self._executor.remove_worker(self._thread) Line 220: Line 221: # runnable interface: internal usage only > This is nice, but internal stuff should be _protected. Agreed and removed. Line 222: Line 223: def start(self): Line 224: self._thread = threading.current_thread() Line 225: -- To view, visit http://gerrit.ovirt.org/29191 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Gerrit-PatchSet: 10 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]> Gerrit-Reviewer: Dan Kenigsberg <[email protected]> Gerrit-Reviewer: Dima Kuznetsov <[email protected]> Gerrit-Reviewer: Federico Simoncelli <[email protected]> Gerrit-Reviewer: Francesco Romani <[email protected]> Gerrit-Reviewer: Michal Skrivanek <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Vinzenz Feenstra <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
