Nir Soffer has posted comments on this change. Change subject: threadpool: add a thread pool with watchdog ......................................................................
Patch Set 8: (3 comments) http://gerrit.ovirt.org/#/c/29191/8/lib/threadpool/worker.py File lib/threadpool/worker.py: Line 67: self._working_on = tag Line 68: self._work_started_at = self._timefunc() Line 69: yield Line 70: self._work_started_at = None Line 71: self._working_on = None > Agreed on the general concept -and agreed that this can be simplified: will I'm not sure that detecting stuck thread by tracking time will work. Different tasks have different timeouts, so stuck state must be a state of the task, not the worker thread or the supervisor. We can use a supervisor thread that check task state, and try to cancel/remove thread when task report it is stuck. Task will have to track time if stuck state depends on a timeout. Another solution may be that each task has a user waiting on the task (e.g. waiting on Event object). If the user of the task time out, it can cancel the task, which can raise Canceled exception, waking up the thread and dropping the task. If we go in this way, we don't need to change the size of the thread pool while its running, because thread cannot get stuck. I suggest we don't design a generic thread pool but build a libvirt connection pool which is what we currently need. When this is done and if we have duplicate code we can remove duplication by creating a common thread pool. Line 72: Line 73: Line 74: class Worker(TimeTrackingThread): Line 75: """ Line 79: """ Line 80: def __init__(self, work_queue): Line 81: super(Worker, self).__init__() Line 82: self.daemon = True Line 83: self._work_queue = work_queue > The standard library provides a nice priorityqueue, which is almost what I Why do you need priorityqueue or scheduler? Line 84: self._stop = threading.Event() Line 85: Line 86: def run(self): Line 87: logging.info('worker %s starting', self.name) Line 115: yield item Line 116: except schedqueue.NotYet: Line 117: yield None Line 118: except schedqueue.Empty: Line 119: yield None > I have to agree. All this dance is done to let the queue transparently rein I don't understand why you need to inject event into the queue. I would implement the supervisor thread in another unrelated thread, using simple loop and Event.wait(timeout) to check all workers/stop. class SupervisorThread: def __init__(self, interval, workers): self.interval = interval self._workers = workers self._event = threading.Event() def _run(self): while not self._event.is_set(): for worker in self._workers[:]: if worker.is_stuck(): worker.cancel() self._event.wait(self.interval) Line 120: Line 121: def _do_work(self, item): Line 122: """ Line 123: process a single work unit. -- To view, visit http://gerrit.ovirt.org/29191 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]> Gerrit-Reviewer: Dan Kenigsberg <[email protected]> Gerrit-Reviewer: Dima Kuznetsov <[email protected]> Gerrit-Reviewer: Federico Simoncelli <[email protected]> Gerrit-Reviewer: Francesco Romani <[email protected]> Gerrit-Reviewer: Michal Skrivanek <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Vinzenz Feenstra <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
