Francesco Romani has posted comments on this change.

Change subject: threadpool: add a thread pool with watchdog
......................................................................


Patch Set 8:

(5 comments)

Added the missing answer. Will amend the docs do describe how this code is 
supposed to be used and what it is supposed to replace, so we (myself included) 
can have a clearer vision of the requirements.

http://gerrit.ovirt.org/#/c/29191/8/lib/threadpool/worker.py
File lib/threadpool/worker.py:

Line 79:     """
Line 80:     def __init__(self, work_queue):
Line 81:         super(Worker, self).__init__()
Line 82:         self.daemon = True
Line 83:         self._work_queue = work_queue
> Why do you need priorityqueue or scheduler?
To implement the periodic tasks.

The worker wakes up periodically and check if
1. there is new work to do?
2. if so, is the work due now or in the future?

This approach let the sampling code just submit once a periodic task to update 
the Vm stats, and let the pool handle the rest.

I think the whole periodic task thing and how this code is supposed to be used 
in sampling deserves some more detailed description, which I'm going to add to 
http://gerrit.ovirt.org/#/c/29190
Line 84:         self._stop = threading.Event()
Line 85: 
Line 86:     def run(self):
Line 87:         logging.info('worker %s starting', self.name)


Line 80:     def __init__(self, work_queue):
Line 81:         super(Worker, self).__init__()
Line 82:         self.daemon = True
Line 83:         self._work_queue = work_queue
Line 84:         self._stop = threading.Event()
> This allows stopping immediately - but usually you want to stop when work i
Your solution to stop threads is more general and nicer, so I'll switch to it.

To cancel stuck thread is harder. We'll need a connection pool, or one 
connection per worker thread - not sure which solution is cleaner given our 
needs here.

We'll also need to make sure it is safe and supported for a thread to e.g. 
close the connection of a different thread. Not sure, need to check with 
libvirt.
Line 85: 
Line 86:     def run(self):
Line 87:         logging.info('worker %s starting', self.name)
Line 88: 


Line 95:         """
Line 96:         execute a single try of work processing.
Line 97:         """
Line 98:         with self._pull() as item:
Line 99:             if item is not None:
> Why do you want to support None?
It is a relic due to the automatic queue reinjection feature implemented in the 
WorkQueue class, a feature than I now think just adds complexity with little if 
any benefit.
To make a long story short: will remove and simplify.
Line 100:                 self._do_work(item)
Line 101: 
Line 102:     def stop(self):
Line 103:         """


Line 115:                 yield item
Line 116:         except schedqueue.NotYet:
Line 117:             yield None
Line 118:         except schedqueue.Empty:
Line 119:             yield None
> I don't understand why you need to inject event into the queue. I would imp
Nice supervisor code, I'll try to come up with something like that (and again, 
simplify things deeply).

For the events: I reinject them to implement the periodicity. Let's consider a 
trivial example: sample CPU stats for each VM. New code just submits a request 
to the pool in the form (interval, callable).

The pool knows it has to run the callable every <interval> seconds, and that 
preserve the current semantics in the VmStatsThread, which issue the underlying 
libvirt call every <interval> seconds.

But we want to get rid of these VmStatsThread, so we need either periodic 
tasks, so the pool automatically renews them, or some other code has to issue 
sampling calls periodically. Could be a scheduler thread for example, but I 
don't really like it.

I felt that periodic tasks could be a useful and meaningful addition, and this 
is the reason why I choose this route.
Surely it can be greatly simplified.

However: I'll add some notes describing with more detail and references the 
case for periodic tasks and task reinjection, so everyone has a clear vision of 
the requirement and the code to be replaced, and we can then debate which is 
the best solution.
Line 120: 
Line 121:     def _do_work(self, item):
Line 122:         """
Line 123:         process a single work unit.


Line 121:     def _do_work(self, item):
Line 122:         """
Line 123:         process a single work unit.
Line 124:         """
Line 125:         tag, work, args, kwargs = item
> work is a function or callable - why not call use an obvious name?
No good reason. Will change.
Line 126:         with self.track_time(tag):
Line 127:             try:
Line 128:                 work(*args, **kwargs)
Line 129:             except Exception:


-- 
To view, visit http://gerrit.ovirt.org/29191
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 8
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Dima Kuznetsov <[email protected]>
Gerrit-Reviewer: Federico Simoncelli <[email protected]>
Gerrit-Reviewer: Francesco Romani <[email protected]>
Gerrit-Reviewer: Michal Skrivanek <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: Saggi Mizrahi <[email protected]>
Gerrit-Reviewer: Vinzenz Feenstra <[email protected]>
Gerrit-Reviewer: Yaniv Bronhaim <[email protected]>
Gerrit-Reviewer: [email protected]
Gerrit-Reviewer: oVirt Jenkins CI Server
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to