Francesco Romani has posted comments on this change. Change subject: threadpool: add a thread pool with watchdog ......................................................................
Patch Set 8: (8 comments) Nir, thanks for the review. First batch of answers, more to come. http://gerrit.ovirt.org/#/c/29191/8/lib/threadpool/worker.py File lib/threadpool/worker.py: Line 25: Line 26: import schedqueue Line 27: Line 28: Line 29: class TimeTrackingThread(threading.Thread): > Please don't add unneeded inheritance. If we need time tracking, add it to Good points, will change. Line 30: """ Line 31: Thin mixin to help a worker thread to track the time Line 32: consumed by a work item. Line 33: """ Line 37: self._working_on = None Line 38: self._timefunc = timefunc Line 39: Line 40: @property Line 41: def busy(self): > Not related to time tracking will move away from here, probably into worker thread Line 42: """ Line 43: is the thread doing some work or is it idle? Line 44: """ Line 45: return self._work_started_at is not None Line 55: def task(self): Line 56: """ Line 57: what is this worker doing? Line 58: """ Line 59: return self._working_on > naming it _task is less surprizing Agreed. will change. Line 60: Line 61: @contextlib.contextmanager Line 62: def track_time(self, tag): Line 63: """ Line 67: self._working_on = tag Line 68: self._work_started_at = self._timefunc() Line 69: yield Line 70: self._work_started_at = None Line 71: self._working_on = None > Not needed. The _do_work of the worker can simply do: Agreed on the general concept -and agreed that this can be simplified: will do. I cannot directly adopt your suggestion because the worker thread doesn't need to know how much time a task took (nor the supervisor does), but instead the supervisor will check asynchronously how much time a task is *taking* (while still in progress) in order to implement to detect stuck task (= task being processed for a time exceeding a threshold). However there is definitely room for simplification, and will do. Line 72: Line 73: Line 74: class Worker(TimeTrackingThread): Line 75: """ Line 79: """ Line 80: def __init__(self, work_queue): Line 81: super(Worker, self).__init__() Line 82: self.daemon = True Line 83: self._work_queue = work_queue > Whats wrong with the builtin Queue? The standard library provides a nice priorityqueue, which is almost what I need here, except for the fact it has not the notion of time, which I need to implement the periodic tasks for interval sampling (poll X every Y seconds). On the other hand, there is the sched module: https://docs.python.org/2/library/sched.html which has the notion of time but it is not thread safe and has a blocking interface (see the delayfunc argument). I need to blend these concepts together somehow, then the workqueue/schedqueue classes are born. Line 84: self._stop = threading.Event() Line 85: Line 86: def run(self): Line 87: logging.info('worker %s starting', self.name) Line 81: super(Worker, self).__init__() Line 82: self.daemon = True Line 83: self._work_queue = work_queue Line 84: self._stop = threading.Event() Line 85: > Use @traceback Will do Line 86: def run(self): Line 87: logging.info('worker %s starting', self.name) Line 88: Line 89: while not self._stop.is_set(): Line 115: yield item Line 116: except schedqueue.NotYet: Line 117: yield None Line 118: except schedqueue.Empty: Line 119: yield None > Unneeded complexity. I have to agree. All this dance is done to let the queue transparently reinject a periodic task into itself. I'm starting to think to just simply do this in the worker thread and save quite some complexity. Line 120: Line 121: def _do_work(self, item): Line 122: """ Line 123: process a single work unit. Line 126: with self.track_time(tag): Line 127: try: Line 128: work(*args, **kwargs) Line 129: except Exception: Line 130: logging.exception('work failed', exc_info=True) > Don't use the non-standard exec_info, exception logs a traceback anyway. will do -- To view, visit http://gerrit.ovirt.org/29191 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]> Gerrit-Reviewer: Dan Kenigsberg <[email protected]> Gerrit-Reviewer: Dima Kuznetsov <[email protected]> Gerrit-Reviewer: Federico Simoncelli <[email protected]> Gerrit-Reviewer: Francesco Romani <[email protected]> Gerrit-Reviewer: Michal Skrivanek <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Vinzenz Feenstra <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
