Nir Soffer has posted comments on this change. Change subject: threadpool: add a thread pool with watchdog ......................................................................
Patch Set 8: (11 comments) On first look in the worker module, this is over-engineered and too complex. From the description, we need a simple pool of threads with an extra supervisor thread to handle stuck tasks. We need the simplest thread pool that is good only for what vdsm needs and nothing else. http://gerrit.ovirt.org/#/c/29191/8/lib/threadpool/worker.py File lib/threadpool/worker.py: Line 25: Line 26: import schedqueue Line 27: Line 28: Line 29: class TimeTrackingThread(threading.Thread): Please don't add unneeded inheritance. If we need time tracking, add it to the worker thread class. And specially do not inherit from Thread - instead keep a thread instance varibale. This allows us to implement the interface that we like to have, and we do not depend on Thread. Line 30: """ Line 31: Thin mixin to help a worker thread to track the time Line 32: consumed by a work item. Line 33: """ Line 37: self._working_on = None Line 38: self._timefunc = timefunc Line 39: Line 40: @property Line 41: def busy(self): Not related to time tracking Line 42: """ Line 43: is the thread doing some work or is it idle? Line 44: """ Line 45: return self._work_started_at is not None Line 55: def task(self): Line 56: """ Line 57: what is this worker doing? Line 58: """ Line 59: return self._working_on naming it _task is less surprizing Line 60: Line 61: @contextlib.contextmanager Line 62: def track_time(self, tag): Line 63: """ Line 67: self._working_on = tag Line 68: self._work_started_at = self._timefunc() Line 69: yield Line 70: self._work_started_at = None Line 71: self._working_on = None Not needed. The _do_work of the worker can simply do: start = self.clock.time() try: run task... finally: self.elapsed = self.clock.time() - start If you want a generic time tracking utility, put it in utils.py: class StopWatch: def __init__(self): self.start = None self.elapsed = None def __enter__(self): self.start = time.time() return self def __exit__(self, *args): self.elapsed = time.time() - self.start with StopWatch() as stopwatch: do stuff... And use it: logging.debug("did stuff in %.2f seconds", stopwatch.elapsed) We don't need complex class hierarchy for this. Line 72: Line 73: Line 74: class Worker(TimeTrackingThread): Line 75: """ Line 79: """ Line 80: def __init__(self, work_queue): Line 81: super(Worker, self).__init__() Line 82: self.daemon = True Line 83: self._work_queue = work_queue Whats wrong with the builtin Queue? Line 84: self._stop = threading.Event() Line 85: Line 86: def run(self): Line 87: logging.info('worker %s starting', self.name) Line 80: def __init__(self, work_queue): Line 81: super(Worker, self).__init__() Line 82: self.daemon = True Line 83: self._work_queue = work_queue Line 84: self._stop = threading.Event() This allows stopping immediately - but usually you want to stop when work in the queue is done. Also this adds another lock and condition for each worker, while we can have only single lock and condition for the entire thread pool if we use the method I describe bellow. To stop a worker, put a a sentinel object into the queue: _STOP = object() In ThreadPool: def stop_some_worker(self): self._queue.put(_STOP) In Worker: def _run(self): while self._running: self._process() def _process(self): it = self._queue.get() if it is _STOP: self._running = False return process it... If I want to stop N workers, I add N _STOP object into the queue. If I want to stop immediately, I can add the _STOP objects at the end of the queue. If I want to stop when current work is done, I can add them at the front of the queue. We may need to cancel a stuck thread, but for this the stop event does not help. We need a way to unblock a stuck task in a way that will raise an exception that cause the thread to exit cleanly, or just fail the current task. For example, if the current task is stuck on a socket, we can make it fail and let the thread continue to process the next request. Line 85: Line 86: def run(self): Line 87: logging.info('worker %s starting', self.name) Line 88: Line 81: super(Worker, self).__init__() Line 82: self.daemon = True Line 83: self._work_queue = work_queue Line 84: self._stop = threading.Event() Line 85: Use @traceback Line 86: def run(self): Line 87: logging.info('worker %s starting', self.name) Line 88: Line 89: while not self._stop.is_set(): Line 95: """ Line 96: execute a single try of work processing. Line 97: """ Line 98: with self._pull() as item: Line 99: if item is not None: Why do you want to support None? Line 100: self._do_work(item) Line 101: Line 102: def stop(self): Line 103: """ Line 115: yield item Line 116: except schedqueue.NotYet: Line 117: yield None Line 118: except schedqueue.Empty: Line 119: yield None Unneeded complexity. Line 120: Line 121: def _do_work(self, item): Line 122: """ Line 123: process a single work unit. Line 121: def _do_work(self, item): Line 122: """ Line 123: process a single work unit. Line 124: """ Line 125: tag, work, args, kwargs = item work is a function or callable - why not call use an obvious name? Line 126: with self.track_time(tag): Line 127: try: Line 128: work(*args, **kwargs) Line 129: except Exception: Line 126: with self.track_time(tag): Line 127: try: Line 128: work(*args, **kwargs) Line 129: except Exception: Line 130: logging.exception('work failed', exc_info=True) Don't use the non-standard exec_info, exception logs a traceback anyway. -- To view, visit http://gerrit.ovirt.org/29191 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]> Gerrit-Reviewer: Dan Kenigsberg <[email protected]> Gerrit-Reviewer: Dima Kuznetsov <[email protected]> Gerrit-Reviewer: Federico Simoncelli <[email protected]> Gerrit-Reviewer: Michal Skrivanek <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Vinzenz Feenstra <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
