Francesco Romani has uploaded a new change for review.

Change subject: threadpool: add a thread pool with watchdog
......................................................................

threadpool: add a thread pool with watchdog

Add a new thread pool implementation with built-in
watchdog.

The watchdog is needed to accomodate the needs of
the virt sampling code.
The virt sampling needs to deal with possibly blocking
I/O like calls which cannot be made not-blocking.

Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Signed-off-by: Francesco Romani <[email protected]>
---
M lib/threadpool/Makefile.am
A lib/threadpool/examples/demo.py
A lib/threadpool/schedqueue.py
A lib/threadpool/tests/test_schedqueue.py
A lib/threadpool/tests/test_timetrackingthread.py
A lib/threadpool/tests/test_worker.py
A lib/threadpool/tests/test_workqueue.py
A lib/threadpool/watchedpool.py
A lib/threadpool/watchman.py
A lib/threadpool/worker.py
A lib/threadpool/workqueue.py
11 files changed, 1,003 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/91/29191/1

diff --git a/lib/threadpool/Makefile.am b/lib/threadpool/Makefile.am
index 3a71a5d..8b7098e 100644
--- a/lib/threadpool/Makefile.am
+++ b/lib/threadpool/Makefile.am
@@ -22,4 +22,9 @@
 
 dist_threadpool_PYTHON = \
        __init__.py \
+       schedqueue.py \
+       watchedpool.py \
+       watchman.py \
+       worker.py \
+       workqueue.py \
        $(NULL)
diff --git a/lib/threadpool/examples/demo.py b/lib/threadpool/examples/demo.py
new file mode 100644
index 0000000..cf80930
--- /dev/null
+++ b/lib/threadpool/examples/demo.py
@@ -0,0 +1,62 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import logging
+import random
+import sys
+import time
+
+from watchedpool import WatchedThreadPool
+
+
+def _test_fun(*args):
+    """
+    just block the caller for a random time span.
+    """
+    tmo = random.randint(1, 15)
+    logging.info('started, waiting: %.3fs', tmo)
+    time.sleep(tmo)
+    logging.info('done')
+
+
+def _main(args):
+    """
+    main test driver
+    """
+    nworkers = int(args[0]) if len(args) >= 1 else 1
+    pool = WatchedThreadPool(nworkers)
+    pool.start()
+    for tag in range(nworkers):
+        pool.submit_periodic(4, _test_fun)
+
+    try:
+        while True:
+            time.sleep(1.0)
+    except KeyboardInterrupt:
+        logging.debug('exiting!')
+    finally:
+        pool.stop()
+
+
+if __name__ == '__main__':
+    logging.basicConfig(
+        level=logging.DEBUG,
+        format='%(asctime)s %(threadName)-10s %(levelname)-8s %(message)s')
+    _main(sys.argv[1:])
diff --git a/lib/threadpool/schedqueue.py b/lib/threadpool/schedqueue.py
new file mode 100644
index 0000000..c2bfbae
--- /dev/null
+++ b/lib/threadpool/schedqueue.py
@@ -0,0 +1,123 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""
+Time-aware priority queue, built like queue.Queue
+but tailored for WatchedThreadPool needs.
+"""
+
+import heapq
+import threading
+import time
+
+
+class QueueError(Exception):
+    """
+    Queue exception root.
+    """
+
+
+class Empty(QueueError):
+    """
+    Attempt to get an item from an empty queue.
+    """
+
+
+class Full(QueueError):
+    """
+    Attempt to put an item into a full queue.
+    """
+
+
+class NotYet(QueueError):
+    """
+    Attempt to get an item which is not due at the
+    present (reported) time.
+    """
+
+
+class SchedQueue(object):
+    """
+    Time-aware, not blocking (client must take care of
+    waiting/timing) priority queue.
+    """
+    def __init__(self, maxsize=0, timefunc=time.time):
+        self._timefunc = timefunc
+        self._maxsize = maxsize
+        self._queue = self._init(maxsize)
+        self._lock = threading.Lock()
+
+    def __len__(self):
+        with self._lock:
+            return self._size()
+
+    def put(self, item, delay=0):
+        """
+        Put an item into the queue, to be made ready 'delay' seconds
+        in the future.
+        """
+        with self._lock:
+            if self._maxsize > 0 and self._size() == self._maxsize:
+                raise Full
+            self._put((self._timefunc() + delay, item))
+
+    def get(self, timenow):
+        """
+        Ask for an item to be consumed.
+        """
+        with self._lock:
+            if not self._size():
+                raise Empty
+            timeval, _ = self._peek()
+            if timeval > timenow:
+                raise NotYet
+            _, item = self._get()
+            return item
+
+    def _init(self, maxsize):
+        """
+        builds the internal queue.
+        """
+        self._maxsize = maxsize  # to make pylint happy
+        return []
+
+    def _size(self):
+        """
+        size of the internal queue.
+        """
+        return len(self._queue)
+
+    def _put(self, item):
+        """
+        put a new element (at the end of) in the queue.
+        """
+        heapq.heappush(self._queue, item)
+
+    def _get(self):
+        """
+        gets (and removes) the next element in queue.
+        """
+        return heapq.heappop(self._queue)
+
+    def _peek(self):
+        """
+        read-only view of the next element in queue.
+        """
+        return self._queue[0]
diff --git a/lib/threadpool/tests/test_schedqueue.py 
b/lib/threadpool/tests/test_schedqueue.py
new file mode 100644
index 0000000..3595589
--- /dev/null
+++ b/lib/threadpool/tests/test_schedqueue.py
@@ -0,0 +1,56 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import unittest
+
+import schedqueue
+
+
+class SchedQueueTests(unittest.TestCase):
+    def _faketime(self):
+        ret = self._ts
+        self._ts += 1
+        return ret
+
+    def setUp(self):
+        self._ts = 0
+        self._sq = schedqueue.SchedQueue(timefunc=self._faketime)
+
+    def test_empty_on_create(self):
+        self.assertEqual(len(self._sq), 0)
+
+    def test_put_on_full(self):
+        self._sq = schedqueue.SchedQueue(maxsize=1, timefunc=self._faketime)
+        self._sq.put('foo')
+        self.assertRaises(schedqueue.Full, self._sq.put, 'bar')
+
+    def test_get_on_empty(self):
+        self.assertRaises(schedqueue.Empty, self._sq.get, 1)
+
+    def test_get_too_early(self):
+        item = 'foo'
+        self._sq.put(item, delay=5)
+        self.assertRaises(schedqueue.NotYet, self._sq.get, 0)
+
+    def test_put_get(self):
+        item = 'foo'
+        self._sq.put(item)
+        res = self._sq.get(timenow=1)
+        self.assertEqual(res, item)
diff --git a/lib/threadpool/tests/test_timetrackingthread.py 
b/lib/threadpool/tests/test_timetrackingthread.py
new file mode 100644
index 0000000..14a25ef
--- /dev/null
+++ b/lib/threadpool/tests/test_timetrackingthread.py
@@ -0,0 +1,64 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import unittest
+
+import worker
+
+
+class TimeTrackingThreadTests(unittest.TestCase):
+    def _fakesleep(self, delay):
+        self._ts += max(0, delay)  # avoid negative delays
+
+    def _faketime(self):
+        ret = self._ts
+        self._ts += 1
+        return ret
+
+    def setUp(self):
+        self._ts = 0
+        self._ttt = worker.TimeTrackingThread(self._faketime)
+
+    def assertIdle(self):
+        self.assertFalse(self._ttt.busy)
+        self.assertEqual(self._ttt.task, None)
+
+    def assertBusyOn(self, task):
+        self.assertTrue(self._ttt.busy)
+        self.assertEqual(self._ttt.task, task)
+
+    def test_created_free(self):
+        self.assertIdle()
+
+    def test_carry_task(self):
+        self.assertIdle()
+
+        task = 'foobar'
+        with self._ttt.track_time(task):
+            self.assertBusyOn(task)
+
+        self.assertIdle()
+
+    def test_elapsed(self):
+        task = 'barbaz'
+        delay = 5
+        with self._ttt.track_time(task):
+            self._fakesleep(delay)
+            self.assertTrue(self._ttt.elapsed >= delay)
diff --git a/lib/threadpool/tests/test_worker.py 
b/lib/threadpool/tests/test_worker.py
new file mode 100644
index 0000000..7396673
--- /dev/null
+++ b/lib/threadpool/tests/test_worker.py
@@ -0,0 +1,60 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import contextlib
+import unittest
+
+import schedqueue
+import worker
+
+
+class FakeWorkQueue(object):
+    def __init__(self, items):
+        self._items = items
+
+    @contextlib.contextmanager
+    def fetch(self):
+        if not self._items:
+            raise schedqueue.Empty
+        item = self._items.pop(0)
+        if issubclass(item, Exception):
+            raise item
+        else:
+            yield item
+
+
+class WorkerThreadTests(unittest.TestCase):
+    def test__pull_empty(self):
+        wq = FakeWorkQueue([])
+        wt = worker.Worker(wq)
+        with wt._pull() as item:
+            self.assertEqual(item, (None, None, None, None))
+
+    def test__pull_empty_exc(self):
+        wq = FakeWorkQueue([schedqueue.Empty])
+        wt = worker.Worker(wq)
+        with wt._pull() as item:
+            self.assertEqual(item, (None, None, None, None))
+
+    def test__pull_notyet(self):
+        wq = FakeWorkQueue([schedqueue.NotYet])
+        wt = worker.Worker(wq)
+        with wt._pull() as item:
+            self.assertEqual(item, (None, None, None, None))
diff --git a/lib/threadpool/tests/test_workqueue.py 
b/lib/threadpool/tests/test_workqueue.py
new file mode 100644
index 0000000..68e982d
--- /dev/null
+++ b/lib/threadpool/tests/test_workqueue.py
@@ -0,0 +1,80 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import unittest
+
+import schedqueue
+import watchedpool
+
+
+def _do_nothing(*args, **kwargs):
+    pass
+
+
+class WorkQueueTests(unittest.TestCase):
+    def _fakesleep(self, delay):
+        pass
+
+    def _faketime(self):
+        ret = self._ts
+        self._ts += 1
+        return ret
+
+    def assertDummyWork(self, workitem, in_tag=None):
+        out_tag, work, args, kwargs = workitem
+        if in_tag is not None:
+            self.assertEqual(in_tag, out_tag)
+        self.assertEqual(work, _do_nothing)
+        self.assertEqual(len(args), 0)
+        self.assertEqual(len(kwargs), 0)
+
+    def setUp(self):
+        self._ts = 0
+        self._wq = watchedpool.WorkQueue(
+            timefunc=self._faketime,
+            delayfunc=self._fakesleep)
+
+    def test_created_empty(self):
+        self.assertEqual(len(self._wq), 0)
+
+    def test_get_on_empty(self):
+        self.assertRaises(schedqueue.Empty, self._wq.get)
+
+    def test_post_on_empty(self):
+        self._wq.post(0, _do_nothing)
+        self.assertEqual(len(self._wq), 1)
+
+    def test_put_get(self):
+        in_tag = 'foo'
+        self._wq.put(in_tag, 0, _do_nothing)
+        self.assertDummyWork(self._wq.get())
+
+    def test_remove(self):
+        in_tag = 'foo'
+        self._wq.put(in_tag, 0, _do_nothing)
+        self._wq.remove(in_tag)
+        self.assertRaises(schedqueue.Empty, self._wq.get)
+
+    def test_fetch(self):
+        in_tag = 'foo'
+        self._wq.put(in_tag, 1, _do_nothing)
+        with self._wq.fetch() as work_item:
+            self.assertDummyWork(work_item)
+        self.assertEqual(len(self._wq), 1)
diff --git a/lib/threadpool/watchedpool.py b/lib/threadpool/watchedpool.py
new file mode 100644
index 0000000..6a8938f
--- /dev/null
+++ b/lib/threadpool/watchedpool.py
@@ -0,0 +1,128 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""
+PoC of a watched thread pool implementation.
+This thread pool adds a watcher/watchman thread which detaches `rogue'
+threads from the main pool, and ask them to die as soon as possible.
+The purpose is to avoid worker threads getting stuck too much on
+busy, possible blocking operations (e.g. risky I/O).
+
+In the worst case scenario, the pool size degenerates in a
+thread-per-operation. In the common and expected case,
+the pool size is fixed and rogue threads are just spikes
+in the thread number.
+
+TODO:
+- allow the client to resize dynamically the pool [?]
+- let this code be easily mergeable with vdsm/storage/threadPool.py
+- demonstrate that this code does not leak threads.
+"""
+
+import logging
+import threading
+
+from watchman import Watchman
+from worker import Worker
+from workqueue import WorkQueue
+
+
+class WatchedThreadPool(object):
+    """
+    Straightforward thread pool, with an added Watchman.
+    """
+    def __init__(self, nworkers=1, work_interval=1):
+        self._nworkers = nworkers
+        self._work_interval = work_interval
+        self._watch_interval = 2 * self._work_interval
+        self._busy_timeout = 3 * self._watch_interval
+        self._work_queue = None
+        self._workers = None
+        self._watcher = None
+        self._running = False
+        self._lock = threading.Lock()
+
+    def start(self):
+        """
+        starts the pool. Thread safe.
+        """
+        with self._lock:
+            if not self._running:
+                self._work_queue = WorkQueue(self._work_interval)
+                self._workers = [self._make_worker()
+                                 for _ in range(self._nworkers)]
+                self._watcher = Watchman(self._workers,
+                                         self._make_worker,
+                                         self._watch_interval,
+                                         self._busy_timeout)
+                self._watcher.start()
+                logging.info('pool started')
+                self._running = True
+
+    def stop(self, wait=True):
+        """
+        terminate all pooled threads. Thread safe.
+        """
+        with self._lock:
+            if self._running:
+                self._running = False
+
+                self._watcher.stop()
+                if wait:
+                    for worker in self._workers:
+                        worker.join()
+                        del worker
+                    self._watcher.reap([])  # last chance
+                    self._watcher.join()
+                    del self._watcher
+
+    def submit_periodic(self, interval, work, *args, **kwargs):
+        """
+        submit a new work unit to the pool.
+        returns the work unit tag.
+        The tag is guaranteed to be unique per-pool.
+        """
+        if self._running:
+            tag = self._work_queue.post(interval, work, args, kwargs)
+            logging.debug('new work: (%s, %s, %s) -> %s',
+                          str(work), str(args), str(kwargs), tag)
+            return tag  # FIXME
+        else:
+            return None
+
+    def remove(self, tag):
+        """
+        remove a work unit from the pool.
+        """
+        return self._work_queue.remove(tag)
+
+    def notify_timeout(self, tag, on_timeout):
+        """
+        register a timeout callback for the given work unit.
+        """
+        self._watcher.notify_timeout(tag, on_timeout)
+
+    def _make_worker(self):
+        """
+        creates a new idle (not started) worker.
+        """
+        worker = Worker(self._work_queue)
+        logging.info('worker %s added to the pool', worker.name)
+        return worker
diff --git a/lib/threadpool/watchman.py b/lib/threadpool/watchman.py
new file mode 100644
index 0000000..53bfc11
--- /dev/null
+++ b/lib/threadpool/watchman.py
@@ -0,0 +1,209 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""
+PoF of a watched thread pool implementation.
+This thread pool adds a watcher/watchman thread which detaches `rogue'
+threads from the main pool, and ask them to die as soon as possible.
+The purpose is to avoid worker threads getting stuck too much on
+busy, possible blocking operations (e.g. risky I/O).
+
+In the worst case scenario, the pool size degenerates in a
+thread-per-operation. In the common and expected case,
+the pool size is fixed and rogue threads are just spikes
+in the thread number.
+
+TODO:
+- allow the client to resize dynamically the pool [?]
+- let this code be easily mergeable with vdsm/storage/threadPool.py
+- demonstrate that this code does not leak threads.
+"""
+
+import logging
+import threading
+import time
+
+
+class Watchman(threading.Thread):
+    """
+    Watches the worker threads, detects the rogues, if any,
+    and possibly replace them to ensure the pool is operative
+    and not blocked.
+    """
+    def __init__(self, workers, make_worker,
+                 watch_interval=5, busy_timeout=10,
+                 rogues_cap=None, delayfunc=time.sleep):
+        super(Watchman, self).__init__()
+        self.daemon = True
+        self._workers = workers
+        self._rogues = []
+        self._rogues_cap = rogues_cap
+        self._watch_interval = watch_interval
+        self._make_worker = make_worker
+        self._busy_timeout = busy_timeout
+        self._timeout_cb = {}
+        self._stop = threading.Event()
+        self._running = True
+        self._delayfunc = delayfunc
+
+    def suspend(self):
+        """
+        suspend the watch, without stopping.
+        """
+        self._running = False
+
+    def resume(self):
+        """
+        resume the watch.
+        """
+        self._running = True
+
+    def start(self):
+        """
+        start the watched pool and the watcher itself.
+        """
+        for worker in self._workers:
+            worker.start()
+        super(Watchman, self).start()
+
+    def stop(self):
+        """
+        stop the watched pool, and the watcher itself.
+        """
+        self.suspend()
+        for worker in self._workers:
+            worker.stop()
+        self._stop.set()
+
+    def run(self):
+        logging.debug(
+            'watcher starting (with %i workers)', len(self._workers))
+
+        while not self._stop.is_set():
+            self._delayfunc(self._watch_interval)
+            if self._running:
+                workers, rogues, newbies = self.verify()
+                self.enforce(workers, rogues, newbies)
+
+        logging.debug('watcher done')
+
+    def notify_timeout(self, tag, on_timeout):
+        """
+        register a timeout callback for the given work unit.
+        """
+        self._timeout_cb[tag] = on_timeout
+
+    def stuck(self, worker):
+        """
+        is a given worker stuck?
+        """
+        stuck, task = False, None
+        if worker.busy:
+            # use locals to freeze the state
+            task = worker.task
+            elapsed = worker.elapsed
+            stuck = elapsed > self._busy_timeout
+            logging.warning(
+                '%s busy on %s for the last %02i seconds (max=%02i) -> %s',
+                worker.name, task, elapsed, self._busy_timeout,
+                'STUCK' if stuck else 'OK')
+        return stuck, task
+
+    def verify(self):
+        """
+        check the health of the worker pool, and classify
+        it into
+        - workers: well behaving worker threads
+        - rogues: workers got stuck, to be evicted from the pool
+        - newbies: new workers ready to replace rogues
+        """
+        try:
+            workers, rogues, newbies = [], [], []
+            logging.debug('verification loop started')
+            for worker in self._workers:
+                stuck, task = self.stuck(worker)
+                if not stuck:
+                    workers.append(worker)
+                elif not self._room_for_rogues(rogues):
+                    logging.warning(
+                        'worker %s gone rogue, but cap reached', worker.name)
+                elif not self._is_detachable(task):
+                    logging.warning(
+                        'worker %s gone rogue, but task %s not detachable',
+                        worker.name, task)
+                else:
+                    logging.info(
+                        'worker %s gone rogue, replacing', worker.name)
+                    worker.stop()
+                    rogues.append(worker)  # until it dies
+                    newbies.append(self._make_worker())
+            logging.debug('verification loop done: %i rogues', len(rogues))
+            return workers, rogues, newbies
+        except Exception:
+            logging.exception('verify failed', exc_info=True)
+
+    def enforce(self, workers, rogues, newbies):
+        """
+        enforce the pool size by evicting rogues into a limbo,
+        and by replenishing the worker pool with the provided
+        newbies.
+        """
+        try:
+            for newbie in newbies:
+                newbie.start()
+                workers.append(newbie)
+            self._workers = workers
+            self.reap(rogues)
+        except Exception:
+            logging.exception('enforce failed', exc_info=True)
+
+    def reap(self, rogues):
+        """
+        collect the terminated rogues.
+        """
+        rogues.extend(self._rogues)
+        new_rogues = []
+        for rogue in rogues:
+            if rogue.is_alive():
+                new_rogues.append(rogue)
+            else:
+                logging.info('rogue thread %s collected', rogue.name)
+                rogue.join()
+                del rogue
+        self._rogues = new_rogues
+
+    def _room_for_rogues(self, rogues):
+        return (self._rogues_cap is None
+                or len(rogues) < self._rogues_cap)
+
+    def _is_detachable(self, task):
+        """
+        if a task is detachable, its worker thread will be detached
+        by the pool. Otherwise the thread will be kept in the pool
+        """
+        on_timeout = self._timeout_cb.get(task, _on_timeout_dummy)
+        return on_timeout(task)
+
+
+def _on_timeout_dummy(*args):
+    """
+    Dummy timeout callback. Marks every work unit as detachable.
+    """
+    return True
diff --git a/lib/threadpool/worker.py b/lib/threadpool/worker.py
new file mode 100644
index 0000000..fe63d34
--- /dev/null
+++ b/lib/threadpool/worker.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import contextlib
+import logging
+import threading
+import time
+
+import schedqueue
+
+
+class TimeTrackingThread(threading.Thread):
+    """
+    Thin mixin to help a worker thread to track the time
+    consumed by a work item.
+    """
+    def __init__(self, timefunc=time.time):
+        super(TimeTrackingThread, self).__init__()
+        self._work_started_at = None
+        self._working_on = None
+        self._timefunc = timefunc
+
+    @property
+    def busy(self):
+        """
+        is the thread doing some work or is it idle?
+        """
+        return self._work_started_at is not None
+
+    @property
+    def elapsed(self):
+        """
+        how much time since the thread started the current work unit?
+        """
+        return self._timefunc() - self._work_started_at
+
+    @property
+    def task(self):
+        """
+        what is this worker doing?
+        """
+        return self._working_on
+
+    @contextlib.contextmanager
+    def track_time(self, tag):
+        """
+        helper context manager to track the time consumed in
+        the current work unit
+        """
+        self._working_on = tag
+        self._work_started_at = self._timefunc()
+        yield
+        self._work_started_at = None
+        self._working_on = None
+
+
+class Worker(TimeTrackingThread):
+    """
+    A regular worker thread. This pool has no additional
+    requirements saved the ability of a worker thread
+    to die ASAP (see the stop() method)
+    """
+    def __init__(self, work_queue):
+        super(Worker, self).__init__()
+        self.daemon = True
+        self._work_queue = work_queue
+        self._stop = threading.Event()
+
+    def run(self):
+        logging.info('worker %s starting', self.name)
+
+        while not self._stop.is_set():
+            self.process()
+
+        logging.info('worker %s done', self.name)
+
+    def process(self):
+        """
+        execute a single try of work processing.
+        """
+        with self._pull() as (tag, work, args, kwargs):
+            if work is not None:
+                self._do_work(tag, work, args, kwargs)
+
+    def stop(self):
+        """
+        gracefully terminate ASAP, and avoid to begin more work.
+        """
+        self._stop.set()
+
+    @contextlib.contextmanager
+    def _pull(self):
+        """
+        pulls the next work item
+        """
+        try:
+            with self._work_queue.fetch() as item:
+                yield item
+        except schedqueue.NotYet:
+            yield None, None, None, None
+        except schedqueue.Empty:
+            yield None, None, None, None
+
+    def _do_work(self, tag, work, args, kwargs):
+        """
+        process a single work unit.
+        """
+        with self.track_time(tag):
+            try:
+                work(*args, **kwargs)
+            except Exception:
+                logging.exception('work failed', exc_info=True)
diff --git a/lib/threadpool/workqueue.py b/lib/threadpool/workqueue.py
new file mode 100644
index 0000000..73949b8
--- /dev/null
+++ b/lib/threadpool/workqueue.py
@@ -0,0 +1,87 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+import contextlib
+import time
+import uuid
+
+import schedqueue
+
+
+class WorkQueue(object):
+    """
+    Tiny wrapper around a plain Queue.Queue, to encapsulate
+    the 'periodic work' behaviour.
+    """
+    def __init__(self, wait_timeout=1,
+                 timefunc=time.time, delayfunc=time.sleep):
+        self._queue = schedqueue.SchedQueue(timefunc=timefunc)
+        self._work = {}
+        self._wait_timeout = wait_timeout
+        self._timefunc = timefunc
+        self._delayfunc = delayfunc
+
+    def put(self, tag, interval, work, *args, **kwargs):
+        """
+        like Queue.put().
+        """
+        item = (work, args, kwargs, interval)
+        self._work[tag] = item
+        self._queue.put(tag, delay=interval)
+
+    def get(self):
+        """
+        like Queue.get().
+        """
+        tag, work, args, kwargs, _ = self._get()
+        return (tag, work, args, kwargs)
+
+    def remove(self, tag):
+        return self._work.pop(tag, None)
+
+    def post(self, interval, work, *args, **kwargs):
+        """
+        like put(), but with auto-tagging.
+        """
+        return self.put(str(uuid.uuid4()), interval, work, *args, **kwargs)
+
+    @contextlib.contextmanager
+    def fetch(self):
+        """
+        like get(), but automatically reinject the work
+        to do in the queue if the task is periodic.
+        """
+        tag, work, args, kwargs, interval = self._get()
+        yield (tag, work, args, kwargs)
+        if interval:
+            self.put(tag, interval, work, args, kwargs)
+
+    def _get(self):
+        tag, work, args, kwargs = None, None, None, None
+        while work is None:
+            self._delayfunc(self._wait_timeout)
+            tag = self._queue.get(self._timefunc())
+            item = self._work.get(tag, None)
+            if item is not None:
+                work, args, kwargs, interval = item
+        return (tag, work, args, kwargs, interval)
+
+    def __len__(self):
+        return len(self._work)


-- 
To view, visit http://gerrit.ovirt.org/29191
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic06da1ba57868dc2c7db67a1868ad10087a1cff2
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to