Nir Soffer has posted comments on this change.

Change subject: threadpool: add a thread pool with watchdog
......................................................................


Patch Set 8:

(11 comments)

On first look in the worker module, this is over-engineered and too complex. 
From the description, we need a simple pool of threads with an extra supervisor 
thread to handle stuck tasks.

We need the simplest thread pool that is good only for what vdsm needs and 
nothing else.

http://gerrit.ovirt.org/#/c/29191/8/lib/threadpool/worker.py
File lib/threadpool/worker.py:

Line 25: 
Line 26: import schedqueue
Line 27: 
Line 28: 
Line 29: class TimeTrackingThread(threading.Thread):
Please don't add unneeded inheritance. If we need time tracking, add it to the 
worker thread class.

And specially do not inherit from Thread - instead keep a thread instance 
varibale. This allows us to implement the interface that we like to have, and 
we do not depend on Thread.
Line 30:     """
Line 31:     Thin mixin to help a worker thread to track the time
Line 32:     consumed by a work item.
Line 33:     """


Line 37:         self._working_on = None
Line 38:         self._timefunc = timefunc
Line 39: 
Line 40:     @property
Line 41:     def busy(self):
Not related to time tracking
Line 42:         """
Line 43:         is the thread doing some work or is it idle?
Line 44:         """
Line 45:         return self._work_started_at is not None


Line 55:     def task(self):
Line 56:         """
Line 57:         what is this worker doing?
Line 58:         """
Line 59:         return self._working_on
naming it _task is less surprizing
Line 60: 
Line 61:     @contextlib.contextmanager
Line 62:     def track_time(self, tag):
Line 63:         """


Line 67:         self._working_on = tag
Line 68:         self._work_started_at = self._timefunc()
Line 69:         yield
Line 70:         self._work_started_at = None
Line 71:         self._working_on = None
Not needed. The _do_work of the worker can simply do:

    start = self.clock.time()
    try:
        run task...
    finally:
        self.elapsed = self.clock.time() - start

If you want a generic time tracking utility, put it in utils.py:

    class StopWatch:

        def __init__(self):
            self.start = None
            self.elapsed = None

        def __enter__(self):
            self.start = time.time()
            return self

        def __exit__(self, *args):
            self.elapsed = time.time() - self.start

    with StopWatch() as stopwatch:
        do stuff...
  
And use it:

    logging.debug("did stuff in %.2f seconds", stopwatch.elapsed)

We don't need complex class hierarchy for this.
Line 72: 
Line 73: 
Line 74: class Worker(TimeTrackingThread):
Line 75:     """


Line 79:     """
Line 80:     def __init__(self, work_queue):
Line 81:         super(Worker, self).__init__()
Line 82:         self.daemon = True
Line 83:         self._work_queue = work_queue
Whats wrong with the builtin Queue?
Line 84:         self._stop = threading.Event()
Line 85: 
Line 86:     def run(self):
Line 87:         logging.info('worker %s starting', self.name)


Line 80:     def __init__(self, work_queue):
Line 81:         super(Worker, self).__init__()
Line 82:         self.daemon = True
Line 83:         self._work_queue = work_queue
Line 84:         self._stop = threading.Event()
This allows stopping immediately - but usually you want to stop when work in 
the queue is done. Also this adds another lock and condition for each worker, 
while we can have only single lock and condition for the entire thread pool if 
we use the method I describe bellow.

To stop a worker, put a a sentinel object into the queue:

    _STOP = object()

In ThreadPool:

   def stop_some_worker(self):
       self._queue.put(_STOP)

In Worker:

    def _run(self):
        while self._running:
            self._process()

    def _process(self):
        it = self._queue.get()
        if it is _STOP:
            self._running = False
            return
        process it...

If I want to stop N workers, I add N _STOP object into the queue. If I want to 
stop immediately, I can add the _STOP objects at the end of the queue. If I 
want to stop when current work is done, I can add them at the front of the 
queue.

We may need to cancel a stuck thread, but for this the stop event does not 
help. We need a way to unblock a stuck task in a way that will raise an 
exception that cause the thread to exit cleanly, or just fail the current task.

For example, if the current task is stuck on a socket, we can make it fail and 
let the thread continue to process the next request.
Line 85: 
Line 86:     def run(self):
Line 87:         logging.info('worker %s starting', self.name)
Line 88: 


Line 81:         super(Worker, self).__init__()
Line 82:         self.daemon = True
Line 83:         self._work_queue = work_queue
Line 84:         self._stop = threading.Event()
Line 85: 
Use @traceback
Line 86:     def run(self):
Line 87:         logging.info('worker %s starting', self.name)
Line 88: 
Line 89:         while not self._stop.is_set():


Line 95:         """
Line 96:         execute a single try of work processing.
Line 97:         """
Line 98:         with self._pull() as item:
Line 99:             if item is not None:
Why do you want to support None?
Line 100:                 self._do_work(item)
Line 101: 
Line 102:     def stop(self):
Line 103:         """


Line 115:                 yield item
Line 116:         except schedqueue.NotYet:
Line 117:             yield None
Line 118:         except schedqueue.Empty:
Line 119:             yield None
Unneeded complexity.
Line 120: 
Line 121:     def _do_work(self, item):
Line 122:         """
Line 123:         process a single work unit.


Line 121:     def _do_work(self, item):
Line 122:         """
Line 123:         process a single work unit.
Line 124:         """
Line 125:         tag, work, args, kwargs = item
work is a function or callable - why not call use an obvious name?
Line 126:         with self.track_time(tag):
Line 127:             try:
Line 128:                 work(*args, **kwargs)
Line 129:             except Exception:


Line 126:         with self.track_time(tag):
Line 127:             try:
Line 128:                 work(*args, **kwargs)
Line 129:             except Exception:
Line 130:                 logging.exception('work failed', exc_info=True)
Don't use the non-standard exec_info, exception logs a traceback anyway.


-- 
To view, visit http://gerrit.ovirt.org/29191
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: comment
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 8
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
Gerrit-Reviewer: Antoni Segura Puimedon <[email protected]>
Gerrit-Reviewer: Dan Kenigsberg <[email protected]>
Gerrit-Reviewer: Dima Kuznetsov <[email protected]>
Gerrit-Reviewer: Federico Simoncelli <[email protected]>
Gerrit-Reviewer: Michal Skrivanek <[email protected]>
Gerrit-Reviewer: Nir Soffer <[email protected]>
Gerrit-Reviewer: Saggi Mizrahi <[email protected]>
Gerrit-Reviewer: Vinzenz Feenstra <[email protected]>
Gerrit-Reviewer: Yaniv Bronhaim <[email protected]>
Gerrit-Reviewer: [email protected]
Gerrit-Reviewer: oVirt Jenkins CI Server
Gerrit-HasComments: Yes
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to