Francesco Romani has posted comments on this change.

Change subject: threadpool: add a thread pool with watchdog
......................................................................


Patch Set 8:

(8 comments)

Nir, thanks for the review. First batch of answers, more to come.

http://gerrit.ovirt.org/#/c/29191/8/lib/threadpool/worker.py
File lib/threadpool/worker.py:

Line 25: 
Line 26: import schedqueue
Line 27: 
Line 28: 
Line 29: class TimeTrackingThread(threading.Thread):
> Please don't add unneeded inheritance. If we need time tracking, add it to 
Good points, will change.
Line 30:     """
Line 31:     Thin mixin to help a worker thread to track the time
Line 32:     consumed by a work item.
Line 33:     """


Line 37:         self._working_on = None
Line 38:         self._timefunc = timefunc
Line 39: 
Line 40:     @property
Line 41:     def busy(self):
> Not related to time tracking
will move away from here, probably into worker thread
Line 42:         """
Line 43:         is the thread doing some work or is it idle?
Line 44:         """
Line 45:         return self._work_started_at is not None


Line 55:     def task(self):
Line 56:         """
Line 57:         what is this worker doing?
Line 58:         """
Line 59:         return self._working_on
> naming it _task is less surprizing
Agreed. will change.
Line 60: 
Line 61:     @contextlib.contextmanager
Line 62:     def track_time(self, tag):
Line 63:         """


Line 67:         self._working_on = tag
Line 68:         self._work_started_at = self._timefunc()
Line 69:         yield
Line 70:         self._work_started_at = None
Line 71:         self._working_on = None
> Not needed. The _do_work of the worker can simply do:
Agreed on the general concept -and agreed that this can be simplified: will do.

I cannot directly adopt your suggestion because the worker thread doesn't need 
to know how much time a task took (nor the supervisor does), but instead the 
supervisor will check asynchronously how much time a task is *taking* (while 
still in progress) in order to implement to detect stuck task (= task being 
processed for a time exceeding a threshold).

However there is definitely room for simplification, and will do.
Line 72: 
Line 73: 
Line 74: class Worker(TimeTrackingThread):
Line 75:     """


Line 79:     """
Line 80:     def __init__(self, work_queue):
Line 81:         super(Worker, self).__init__()
Line 82:         self.daemon = True
Line 83:         self._work_queue = work_queue
> Whats wrong with the builtin Queue?
The standard library provides a nice priorityqueue, which is almost what I need 
here, except for the fact it has not the notion of time, which I need to 
implement the periodic tasks for interval sampling (poll X every Y seconds).

On the other hand, there is the sched module:
https://docs.python.org/2/library/sched.html
which has the notion of time but it is not thread safe and has a blocking 
interface (see the delayfunc argument).

I need to blend these concepts together somehow, then the workqueue/schedqueue 
classes are born.
Line 84:         self._stop = threading.Event()
Line 85: 
Line 86:     def run(self):
Line 87:         logging.info('worker %s starting', self.name)


Line 81:         super(Worker, self).__init__()
Line 82:         self.daemon = True
Line 83:         self._work_queue = work_queue
Line 84:         self._stop = threading.Event()
Line 85: 
> Use @traceback
Will do
Line 86:     def run(self):
Line 87:         logging.info('worker %s starting', self.name)
Line 88: 
Line 89:         while not self._stop.is_set():


Line 115:                 yield item
Line 116:         except schedqueue.NotYet:
Line 117:             yield None
Line 118:         except schedqueue.Empty:
Line 119:             yield None
> Unneeded complexity.
I have to agree. All this dance is done to let the queue transparently reinject 
a periodic task into itself.

I'm starting to think to just simply do this in the worker thread and save 
quite some complexity.
Line 120: 
Line 121:     def _do_work(self, item):
Line 122:         """
Line 123:         process a single work unit.


Line 126:         with self.track_time(tag):
Line 127:             try:
Line 128:                 work(*args, **kwargs)
Line 129:             except Exception:
Line 130:                 logging.exception('work failed', exc_info=True)
> Don't use the non-standard exec_info, exception logs a traceback anyway.
will do


-- 
To view, visit http://gerrit.ovirt.org/29191
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 8
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Dima Kuznetsov <[email protected]>
Gerrit-Reviewer: Federico Simoncelli <[email protected]>
Gerrit-Reviewer: Francesco Romani <[email protected]>
Gerrit-Reviewer: Michal Skrivanek <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: Saggi Mizrahi <[email protected]>
Gerrit-Reviewer: Vinzenz Feenstra <[email protected]>
Gerrit-Reviewer: Yaniv Bronhaim <[email protected]>
Gerrit-Reviewer: [email protected]
Gerrit-Reviewer: oVirt Jenkins CI Server
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to