Francesco Romani has posted comments on this change. Change subject: threadpool: add a thread pool with watchdog ......................................................................
Patch Set 8: (5 comments) Added the missing answer. Will amend the docs do describe how this code is supposed to be used and what it is supposed to replace, so we (myself included) can have a clearer vision of the requirements. http://gerrit.ovirt.org/#/c/29191/8/lib/threadpool/worker.py File lib/threadpool/worker.py: Line 79: """ Line 80: def __init__(self, work_queue): Line 81: super(Worker, self).__init__() Line 82: self.daemon = True Line 83: self._work_queue = work_queue > Why do you need priorityqueue or scheduler? To implement the periodic tasks. The worker wakes up periodically and check if 1. there is new work to do? 2. if so, is the work due now or in the future? This approach let the sampling code just submit once a periodic task to update the Vm stats, and let the pool handle the rest. I think the whole periodic task thing and how this code is supposed to be used in sampling deserves some more detailed description, which I'm going to add to http://gerrit.ovirt.org/#/c/29190 Line 84: self._stop = threading.Event() Line 85: Line 86: def run(self): Line 87: logging.info('worker %s starting', self.name) Line 80: def __init__(self, work_queue): Line 81: super(Worker, self).__init__() Line 82: self.daemon = True Line 83: self._work_queue = work_queue Line 84: self._stop = threading.Event() > This allows stopping immediately - but usually you want to stop when work i Your solution to stop threads is more general and nicer, so I'll switch to it. To cancel stuck thread is harder. We'll need a connection pool, or one connection per worker thread - not sure which solution is cleaner given our needs here. We'll also need to make sure it is safe and supported for a thread to e.g. close the connection of a different thread. Not sure, need to check with libvirt. Line 85: Line 86: def run(self): Line 87: logging.info('worker %s starting', self.name) Line 88: Line 95: """ Line 96: execute a single try of work processing. Line 97: """ Line 98: with self._pull() as item: Line 99: if item is not None: > Why do you want to support None? It is a relic due to the automatic queue reinjection feature implemented in the WorkQueue class, a feature than I now think just adds complexity with little if any benefit. To make a long story short: will remove and simplify. Line 100: self._do_work(item) Line 101: Line 102: def stop(self): Line 103: """ Line 115: yield item Line 116: except schedqueue.NotYet: Line 117: yield None Line 118: except schedqueue.Empty: Line 119: yield None > I don't understand why you need to inject event into the queue. I would imp Nice supervisor code, I'll try to come up with something like that (and again, simplify things deeply). For the events: I reinject them to implement the periodicity. Let's consider a trivial example: sample CPU stats for each VM. New code just submits a request to the pool in the form (interval, callable). The pool knows it has to run the callable every <interval> seconds, and that preserve the current semantics in the VmStatsThread, which issue the underlying libvirt call every <interval> seconds. But we want to get rid of these VmStatsThread, so we need either periodic tasks, so the pool automatically renews them, or some other code has to issue sampling calls periodically. Could be a scheduler thread for example, but I don't really like it. I felt that periodic tasks could be a useful and meaningful addition, and this is the reason why I choose this route. Surely it can be greatly simplified. However: I'll add some notes describing with more detail and references the case for periodic tasks and task reinjection, so everyone has a clear vision of the requirement and the code to be replaced, and we can then debate which is the best solution. Line 120: Line 121: def _do_work(self, item): Line 122: """ Line 123: process a single work unit. Line 121: def _do_work(self, item): Line 122: """ Line 123: process a single work unit. Line 124: """ Line 125: tag, work, args, kwargs = item > work is a function or callable - why not call use an obvious name? No good reason. Will change. Line 126: with self.track_time(tag): Line 127: try: Line 128: work(*args, **kwargs) Line 129: except Exception: -- To view, visit http://gerrit.ovirt.org/29191 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Gerrit-PatchSet: 8 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]> Gerrit-Reviewer: Dan Kenigsberg <[email protected]> Gerrit-Reviewer: Dima Kuznetsov <[email protected]> Gerrit-Reviewer: Federico Simoncelli <[email protected]> Gerrit-Reviewer: Francesco Romani <[email protected]> Gerrit-Reviewer: Michal Skrivanek <[email protected]> Gerrit-Reviewer: Nir Soffer <[email protected]> Gerrit-Reviewer: Saggi Mizrahi <[email protected]> Gerrit-Reviewer: Vinzenz Feenstra <[email protected]> Gerrit-Reviewer: Yaniv Bronhaim <[email protected]> Gerrit-Reviewer: [email protected] Gerrit-Reviewer: oVirt Jenkins CI Server Gerrit-HasComments: Yes _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
