Francesco Romani has uploaded a new change for review. Change subject: threadpool: add a thread pool with watchdog ......................................................................
threadpool: add a thread pool with watchdog Add a new thread pool implementation with built-in watchdog. The watchdog is needed to accomodate the needs of the virt sampling code. The virt sampling needs to deal with possibly blocking I/O like calls which cannot be made not-blocking. Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Signed-off-by: Francesco Romani <[email protected]> --- M lib/threadpool/Makefile.am A lib/threadpool/examples/demo.py A lib/threadpool/schedqueue.py A lib/threadpool/tests/test_schedqueue.py A lib/threadpool/tests/test_timetrackingthread.py A lib/threadpool/tests/test_worker.py A lib/threadpool/tests/test_workqueue.py A lib/threadpool/watchedpool.py A lib/threadpool/watchman.py A lib/threadpool/worker.py A lib/threadpool/workqueue.py 11 files changed, 1,003 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/91/29191/1 diff --git a/lib/threadpool/Makefile.am b/lib/threadpool/Makefile.am index 3a71a5d..8b7098e 100644 --- a/lib/threadpool/Makefile.am +++ b/lib/threadpool/Makefile.am @@ -22,4 +22,9 @@ dist_threadpool_PYTHON = \ __init__.py \ + schedqueue.py \ + watchedpool.py \ + watchman.py \ + worker.py \ + workqueue.py \ $(NULL) diff --git a/lib/threadpool/examples/demo.py b/lib/threadpool/examples/demo.py new file mode 100644 index 0000000..cf80930 --- /dev/null +++ b/lib/threadpool/examples/demo.py @@ -0,0 +1,62 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import logging +import random +import sys +import time + +from watchedpool import WatchedThreadPool + + +def _test_fun(*args): + """ + just block the caller for a random time span. + """ + tmo = random.randint(1, 15) + logging.info('started, waiting: %.3fs', tmo) + time.sleep(tmo) + logging.info('done') + + +def _main(args): + """ + main test driver + """ + nworkers = int(args[0]) if len(args) >= 1 else 1 + pool = WatchedThreadPool(nworkers) + pool.start() + for tag in range(nworkers): + pool.submit_periodic(4, _test_fun) + + try: + while True: + time.sleep(1.0) + except KeyboardInterrupt: + logging.debug('exiting!') + finally: + pool.stop() + + +if __name__ == '__main__': + logging.basicConfig( + level=logging.DEBUG, + format='%(asctime)s %(threadName)-10s %(levelname)-8s %(message)s') + _main(sys.argv[1:]) diff --git a/lib/threadpool/schedqueue.py b/lib/threadpool/schedqueue.py new file mode 100644 index 0000000..c2bfbae --- /dev/null +++ b/lib/threadpool/schedqueue.py @@ -0,0 +1,123 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +""" +Time-aware priority queue, built like queue.Queue +but tailored for WatchedThreadPool needs. +""" + +import heapq +import threading +import time + + +class QueueError(Exception): + """ + Queue exception root. + """ + + +class Empty(QueueError): + """ + Attempt to get an item from an empty queue. + """ + + +class Full(QueueError): + """ + Attempt to put an item into a full queue. + """ + + +class NotYet(QueueError): + """ + Attempt to get an item which is not due at the + present (reported) time. + """ + + +class SchedQueue(object): + """ + Time-aware, not blocking (client must take care of + waiting/timing) priority queue. + """ + def __init__(self, maxsize=0, timefunc=time.time): + self._timefunc = timefunc + self._maxsize = maxsize + self._queue = self._init(maxsize) + self._lock = threading.Lock() + + def __len__(self): + with self._lock: + return self._size() + + def put(self, item, delay=0): + """ + Put an item into the queue, to be made ready 'delay' seconds + in the future. + """ + with self._lock: + if self._maxsize > 0 and self._size() == self._maxsize: + raise Full + self._put((self._timefunc() + delay, item)) + + def get(self, timenow): + """ + Ask for an item to be consumed. + """ + with self._lock: + if not self._size(): + raise Empty + timeval, _ = self._peek() + if timeval > timenow: + raise NotYet + _, item = self._get() + return item + + def _init(self, maxsize): + """ + builds the internal queue. + """ + self._maxsize = maxsize # to make pylint happy + return [] + + def _size(self): + """ + size of the internal queue. + """ + return len(self._queue) + + def _put(self, item): + """ + put a new element (at the end of) in the queue. + """ + heapq.heappush(self._queue, item) + + def _get(self): + """ + gets (and removes) the next element in queue. + """ + return heapq.heappop(self._queue) + + def _peek(self): + """ + read-only view of the next element in queue. + """ + return self._queue[0] diff --git a/lib/threadpool/tests/test_schedqueue.py b/lib/threadpool/tests/test_schedqueue.py new file mode 100644 index 0000000..3595589 --- /dev/null +++ b/lib/threadpool/tests/test_schedqueue.py @@ -0,0 +1,56 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import unittest + +import schedqueue + + +class SchedQueueTests(unittest.TestCase): + def _faketime(self): + ret = self._ts + self._ts += 1 + return ret + + def setUp(self): + self._ts = 0 + self._sq = schedqueue.SchedQueue(timefunc=self._faketime) + + def test_empty_on_create(self): + self.assertEqual(len(self._sq), 0) + + def test_put_on_full(self): + self._sq = schedqueue.SchedQueue(maxsize=1, timefunc=self._faketime) + self._sq.put('foo') + self.assertRaises(schedqueue.Full, self._sq.put, 'bar') + + def test_get_on_empty(self): + self.assertRaises(schedqueue.Empty, self._sq.get, 1) + + def test_get_too_early(self): + item = 'foo' + self._sq.put(item, delay=5) + self.assertRaises(schedqueue.NotYet, self._sq.get, 0) + + def test_put_get(self): + item = 'foo' + self._sq.put(item) + res = self._sq.get(timenow=1) + self.assertEqual(res, item) diff --git a/lib/threadpool/tests/test_timetrackingthread.py b/lib/threadpool/tests/test_timetrackingthread.py new file mode 100644 index 0000000..14a25ef --- /dev/null +++ b/lib/threadpool/tests/test_timetrackingthread.py @@ -0,0 +1,64 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import unittest + +import worker + + +class TimeTrackingThreadTests(unittest.TestCase): + def _fakesleep(self, delay): + self._ts += max(0, delay) # avoid negative delays + + def _faketime(self): + ret = self._ts + self._ts += 1 + return ret + + def setUp(self): + self._ts = 0 + self._ttt = worker.TimeTrackingThread(self._faketime) + + def assertIdle(self): + self.assertFalse(self._ttt.busy) + self.assertEqual(self._ttt.task, None) + + def assertBusyOn(self, task): + self.assertTrue(self._ttt.busy) + self.assertEqual(self._ttt.task, task) + + def test_created_free(self): + self.assertIdle() + + def test_carry_task(self): + self.assertIdle() + + task = 'foobar' + with self._ttt.track_time(task): + self.assertBusyOn(task) + + self.assertIdle() + + def test_elapsed(self): + task = 'barbaz' + delay = 5 + with self._ttt.track_time(task): + self._fakesleep(delay) + self.assertTrue(self._ttt.elapsed >= delay) diff --git a/lib/threadpool/tests/test_worker.py b/lib/threadpool/tests/test_worker.py new file mode 100644 index 0000000..7396673 --- /dev/null +++ b/lib/threadpool/tests/test_worker.py @@ -0,0 +1,60 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import contextlib +import unittest + +import schedqueue +import worker + + +class FakeWorkQueue(object): + def __init__(self, items): + self._items = items + + @contextlib.contextmanager + def fetch(self): + if not self._items: + raise schedqueue.Empty + item = self._items.pop(0) + if issubclass(item, Exception): + raise item + else: + yield item + + +class WorkerThreadTests(unittest.TestCase): + def test__pull_empty(self): + wq = FakeWorkQueue([]) + wt = worker.Worker(wq) + with wt._pull() as item: + self.assertEqual(item, (None, None, None, None)) + + def test__pull_empty_exc(self): + wq = FakeWorkQueue([schedqueue.Empty]) + wt = worker.Worker(wq) + with wt._pull() as item: + self.assertEqual(item, (None, None, None, None)) + + def test__pull_notyet(self): + wq = FakeWorkQueue([schedqueue.NotYet]) + wt = worker.Worker(wq) + with wt._pull() as item: + self.assertEqual(item, (None, None, None, None)) diff --git a/lib/threadpool/tests/test_workqueue.py b/lib/threadpool/tests/test_workqueue.py new file mode 100644 index 0000000..68e982d --- /dev/null +++ b/lib/threadpool/tests/test_workqueue.py @@ -0,0 +1,80 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import unittest + +import schedqueue +import watchedpool + + +def _do_nothing(*args, **kwargs): + pass + + +class WorkQueueTests(unittest.TestCase): + def _fakesleep(self, delay): + pass + + def _faketime(self): + ret = self._ts + self._ts += 1 + return ret + + def assertDummyWork(self, workitem, in_tag=None): + out_tag, work, args, kwargs = workitem + if in_tag is not None: + self.assertEqual(in_tag, out_tag) + self.assertEqual(work, _do_nothing) + self.assertEqual(len(args), 0) + self.assertEqual(len(kwargs), 0) + + def setUp(self): + self._ts = 0 + self._wq = watchedpool.WorkQueue( + timefunc=self._faketime, + delayfunc=self._fakesleep) + + def test_created_empty(self): + self.assertEqual(len(self._wq), 0) + + def test_get_on_empty(self): + self.assertRaises(schedqueue.Empty, self._wq.get) + + def test_post_on_empty(self): + self._wq.post(0, _do_nothing) + self.assertEqual(len(self._wq), 1) + + def test_put_get(self): + in_tag = 'foo' + self._wq.put(in_tag, 0, _do_nothing) + self.assertDummyWork(self._wq.get()) + + def test_remove(self): + in_tag = 'foo' + self._wq.put(in_tag, 0, _do_nothing) + self._wq.remove(in_tag) + self.assertRaises(schedqueue.Empty, self._wq.get) + + def test_fetch(self): + in_tag = 'foo' + self._wq.put(in_tag, 1, _do_nothing) + with self._wq.fetch() as work_item: + self.assertDummyWork(work_item) + self.assertEqual(len(self._wq), 1) diff --git a/lib/threadpool/watchedpool.py b/lib/threadpool/watchedpool.py new file mode 100644 index 0000000..6a8938f --- /dev/null +++ b/lib/threadpool/watchedpool.py @@ -0,0 +1,128 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +""" +PoC of a watched thread pool implementation. +This thread pool adds a watcher/watchman thread which detaches `rogue' +threads from the main pool, and ask them to die as soon as possible. +The purpose is to avoid worker threads getting stuck too much on +busy, possible blocking operations (e.g. risky I/O). + +In the worst case scenario, the pool size degenerates in a +thread-per-operation. In the common and expected case, +the pool size is fixed and rogue threads are just spikes +in the thread number. + +TODO: +- allow the client to resize dynamically the pool [?] +- let this code be easily mergeable with vdsm/storage/threadPool.py +- demonstrate that this code does not leak threads. +""" + +import logging +import threading + +from watchman import Watchman +from worker import Worker +from workqueue import WorkQueue + + +class WatchedThreadPool(object): + """ + Straightforward thread pool, with an added Watchman. + """ + def __init__(self, nworkers=1, work_interval=1): + self._nworkers = nworkers + self._work_interval = work_interval + self._watch_interval = 2 * self._work_interval + self._busy_timeout = 3 * self._watch_interval + self._work_queue = None + self._workers = None + self._watcher = None + self._running = False + self._lock = threading.Lock() + + def start(self): + """ + starts the pool. Thread safe. + """ + with self._lock: + if not self._running: + self._work_queue = WorkQueue(self._work_interval) + self._workers = [self._make_worker() + for _ in range(self._nworkers)] + self._watcher = Watchman(self._workers, + self._make_worker, + self._watch_interval, + self._busy_timeout) + self._watcher.start() + logging.info('pool started') + self._running = True + + def stop(self, wait=True): + """ + terminate all pooled threads. Thread safe. + """ + with self._lock: + if self._running: + self._running = False + + self._watcher.stop() + if wait: + for worker in self._workers: + worker.join() + del worker + self._watcher.reap([]) # last chance + self._watcher.join() + del self._watcher + + def submit_periodic(self, interval, work, *args, **kwargs): + """ + submit a new work unit to the pool. + returns the work unit tag. + The tag is guaranteed to be unique per-pool. + """ + if self._running: + tag = self._work_queue.post(interval, work, args, kwargs) + logging.debug('new work: (%s, %s, %s) -> %s', + str(work), str(args), str(kwargs), tag) + return tag # FIXME + else: + return None + + def remove(self, tag): + """ + remove a work unit from the pool. + """ + return self._work_queue.remove(tag) + + def notify_timeout(self, tag, on_timeout): + """ + register a timeout callback for the given work unit. + """ + self._watcher.notify_timeout(tag, on_timeout) + + def _make_worker(self): + """ + creates a new idle (not started) worker. + """ + worker = Worker(self._work_queue) + logging.info('worker %s added to the pool', worker.name) + return worker diff --git a/lib/threadpool/watchman.py b/lib/threadpool/watchman.py new file mode 100644 index 0000000..53bfc11 --- /dev/null +++ b/lib/threadpool/watchman.py @@ -0,0 +1,209 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +""" +PoF of a watched thread pool implementation. +This thread pool adds a watcher/watchman thread which detaches `rogue' +threads from the main pool, and ask them to die as soon as possible. +The purpose is to avoid worker threads getting stuck too much on +busy, possible blocking operations (e.g. risky I/O). + +In the worst case scenario, the pool size degenerates in a +thread-per-operation. In the common and expected case, +the pool size is fixed and rogue threads are just spikes +in the thread number. + +TODO: +- allow the client to resize dynamically the pool [?] +- let this code be easily mergeable with vdsm/storage/threadPool.py +- demonstrate that this code does not leak threads. +""" + +import logging +import threading +import time + + +class Watchman(threading.Thread): + """ + Watches the worker threads, detects the rogues, if any, + and possibly replace them to ensure the pool is operative + and not blocked. + """ + def __init__(self, workers, make_worker, + watch_interval=5, busy_timeout=10, + rogues_cap=None, delayfunc=time.sleep): + super(Watchman, self).__init__() + self.daemon = True + self._workers = workers + self._rogues = [] + self._rogues_cap = rogues_cap + self._watch_interval = watch_interval + self._make_worker = make_worker + self._busy_timeout = busy_timeout + self._timeout_cb = {} + self._stop = threading.Event() + self._running = True + self._delayfunc = delayfunc + + def suspend(self): + """ + suspend the watch, without stopping. + """ + self._running = False + + def resume(self): + """ + resume the watch. + """ + self._running = True + + def start(self): + """ + start the watched pool and the watcher itself. + """ + for worker in self._workers: + worker.start() + super(Watchman, self).start() + + def stop(self): + """ + stop the watched pool, and the watcher itself. + """ + self.suspend() + for worker in self._workers: + worker.stop() + self._stop.set() + + def run(self): + logging.debug( + 'watcher starting (with %i workers)', len(self._workers)) + + while not self._stop.is_set(): + self._delayfunc(self._watch_interval) + if self._running: + workers, rogues, newbies = self.verify() + self.enforce(workers, rogues, newbies) + + logging.debug('watcher done') + + def notify_timeout(self, tag, on_timeout): + """ + register a timeout callback for the given work unit. + """ + self._timeout_cb[tag] = on_timeout + + def stuck(self, worker): + """ + is a given worker stuck? + """ + stuck, task = False, None + if worker.busy: + # use locals to freeze the state + task = worker.task + elapsed = worker.elapsed + stuck = elapsed > self._busy_timeout + logging.warning( + '%s busy on %s for the last %02i seconds (max=%02i) -> %s', + worker.name, task, elapsed, self._busy_timeout, + 'STUCK' if stuck else 'OK') + return stuck, task + + def verify(self): + """ + check the health of the worker pool, and classify + it into + - workers: well behaving worker threads + - rogues: workers got stuck, to be evicted from the pool + - newbies: new workers ready to replace rogues + """ + try: + workers, rogues, newbies = [], [], [] + logging.debug('verification loop started') + for worker in self._workers: + stuck, task = self.stuck(worker) + if not stuck: + workers.append(worker) + elif not self._room_for_rogues(rogues): + logging.warning( + 'worker %s gone rogue, but cap reached', worker.name) + elif not self._is_detachable(task): + logging.warning( + 'worker %s gone rogue, but task %s not detachable', + worker.name, task) + else: + logging.info( + 'worker %s gone rogue, replacing', worker.name) + worker.stop() + rogues.append(worker) # until it dies + newbies.append(self._make_worker()) + logging.debug('verification loop done: %i rogues', len(rogues)) + return workers, rogues, newbies + except Exception: + logging.exception('verify failed', exc_info=True) + + def enforce(self, workers, rogues, newbies): + """ + enforce the pool size by evicting rogues into a limbo, + and by replenishing the worker pool with the provided + newbies. + """ + try: + for newbie in newbies: + newbie.start() + workers.append(newbie) + self._workers = workers + self.reap(rogues) + except Exception: + logging.exception('enforce failed', exc_info=True) + + def reap(self, rogues): + """ + collect the terminated rogues. + """ + rogues.extend(self._rogues) + new_rogues = [] + for rogue in rogues: + if rogue.is_alive(): + new_rogues.append(rogue) + else: + logging.info('rogue thread %s collected', rogue.name) + rogue.join() + del rogue + self._rogues = new_rogues + + def _room_for_rogues(self, rogues): + return (self._rogues_cap is None + or len(rogues) < self._rogues_cap) + + def _is_detachable(self, task): + """ + if a task is detachable, its worker thread will be detached + by the pool. Otherwise the thread will be kept in the pool + """ + on_timeout = self._timeout_cb.get(task, _on_timeout_dummy) + return on_timeout(task) + + +def _on_timeout_dummy(*args): + """ + Dummy timeout callback. Marks every work unit as detachable. + """ + return True diff --git a/lib/threadpool/worker.py b/lib/threadpool/worker.py new file mode 100644 index 0000000..fe63d34 --- /dev/null +++ b/lib/threadpool/worker.py @@ -0,0 +1,129 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import contextlib +import logging +import threading +import time + +import schedqueue + + +class TimeTrackingThread(threading.Thread): + """ + Thin mixin to help a worker thread to track the time + consumed by a work item. + """ + def __init__(self, timefunc=time.time): + super(TimeTrackingThread, self).__init__() + self._work_started_at = None + self._working_on = None + self._timefunc = timefunc + + @property + def busy(self): + """ + is the thread doing some work or is it idle? + """ + return self._work_started_at is not None + + @property + def elapsed(self): + """ + how much time since the thread started the current work unit? + """ + return self._timefunc() - self._work_started_at + + @property + def task(self): + """ + what is this worker doing? + """ + return self._working_on + + @contextlib.contextmanager + def track_time(self, tag): + """ + helper context manager to track the time consumed in + the current work unit + """ + self._working_on = tag + self._work_started_at = self._timefunc() + yield + self._work_started_at = None + self._working_on = None + + +class Worker(TimeTrackingThread): + """ + A regular worker thread. This pool has no additional + requirements saved the ability of a worker thread + to die ASAP (see the stop() method) + """ + def __init__(self, work_queue): + super(Worker, self).__init__() + self.daemon = True + self._work_queue = work_queue + self._stop = threading.Event() + + def run(self): + logging.info('worker %s starting', self.name) + + while not self._stop.is_set(): + self.process() + + logging.info('worker %s done', self.name) + + def process(self): + """ + execute a single try of work processing. + """ + with self._pull() as (tag, work, args, kwargs): + if work is not None: + self._do_work(tag, work, args, kwargs) + + def stop(self): + """ + gracefully terminate ASAP, and avoid to begin more work. + """ + self._stop.set() + + @contextlib.contextmanager + def _pull(self): + """ + pulls the next work item + """ + try: + with self._work_queue.fetch() as item: + yield item + except schedqueue.NotYet: + yield None, None, None, None + except schedqueue.Empty: + yield None, None, None, None + + def _do_work(self, tag, work, args, kwargs): + """ + process a single work unit. + """ + with self.track_time(tag): + try: + work(*args, **kwargs) + except Exception: + logging.exception('work failed', exc_info=True) diff --git a/lib/threadpool/workqueue.py b/lib/threadpool/workqueue.py new file mode 100644 index 0000000..73949b8 --- /dev/null +++ b/lib/threadpool/workqueue.py @@ -0,0 +1,87 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +import contextlib +import time +import uuid + +import schedqueue + + +class WorkQueue(object): + """ + Tiny wrapper around a plain Queue.Queue, to encapsulate + the 'periodic work' behaviour. + """ + def __init__(self, wait_timeout=1, + timefunc=time.time, delayfunc=time.sleep): + self._queue = schedqueue.SchedQueue(timefunc=timefunc) + self._work = {} + self._wait_timeout = wait_timeout + self._timefunc = timefunc + self._delayfunc = delayfunc + + def put(self, tag, interval, work, *args, **kwargs): + """ + like Queue.put(). + """ + item = (work, args, kwargs, interval) + self._work[tag] = item + self._queue.put(tag, delay=interval) + + def get(self): + """ + like Queue.get(). + """ + tag, work, args, kwargs, _ = self._get() + return (tag, work, args, kwargs) + + def remove(self, tag): + return self._work.pop(tag, None) + + def post(self, interval, work, *args, **kwargs): + """ + like put(), but with auto-tagging. + """ + return self.put(str(uuid.uuid4()), interval, work, *args, **kwargs) + + @contextlib.contextmanager + def fetch(self): + """ + like get(), but automatically reinject the work + to do in the queue if the task is periodic. + """ + tag, work, args, kwargs, interval = self._get() + yield (tag, work, args, kwargs) + if interval: + self.put(tag, interval, work, args, kwargs) + + def _get(self): + tag, work, args, kwargs = None, None, None, None + while work is None: + self._delayfunc(self._wait_timeout) + tag = self._queue.get(self._timefunc()) + item = self._work.get(tag, None) + if item is not None: + work, args, kwargs, interval = item + return (tag, work, args, kwargs, interval) + + def __len__(self): + return len(self._work) -- To view, visit http://gerrit.ovirt.org/29191 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
