Francesco Romani has uploaded a new change for review. Change subject: WIP: threadpool: add compatibility module ......................................................................
WIP: threadpool: add compatibility module WORK IN PROGRESS Add a compatibility layer with the existing thread pool implementation, used in the storage subsystem. Change-Id: I88f788605b23697f51a5fc6690f902f22272a89e Signed-off-by: Francesco Romani <[email protected]> --- M debian/vdsm-python-threadpool.install M lib/threadpool/Makefile.am A lib/threadpool/compat.py A lib/threadpool/examples/demo_threadpool.py R lib/threadpool/examples/demo_watchedpool.py M vdsm.spec.in 6 files changed, 161 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/28/29428/1 diff --git a/debian/vdsm-python-threadpool.install b/debian/vdsm-python-threadpool.install index 4d510ca..bea1282 100644 --- a/debian/vdsm-python-threadpool.install +++ b/debian/vdsm-python-threadpool.install @@ -1,4 +1,5 @@ ./usr/lib/python2.7/dist-packages/threadpool/__init__.py +./usr/lib/python2.7/dist-packages/threadpool/compat.py ./usr/lib/python2.7/dist-packages/threadpool/futures.py ./usr/lib/python2.7/dist-packages/threadpool/schedqueue.py ./usr/lib/python2.7/dist-packages/threadpool/watchedpool.py diff --git a/lib/threadpool/Makefile.am b/lib/threadpool/Makefile.am index 106d90f..5259a3c 100644 --- a/lib/threadpool/Makefile.am +++ b/lib/threadpool/Makefile.am @@ -22,6 +22,7 @@ dist_threadpool_PYTHON = \ __init__.py \ + compat.py \ futures.py \ schedqueue.py \ watchedpool.py \ diff --git a/lib/threadpool/compat.py b/lib/threadpool/compat.py new file mode 100644 index 0000000..22546b1 --- /dev/null +++ b/lib/threadpool/compat.py @@ -0,0 +1,88 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +""" +plain thread pool +""" + +import logging +import threading + +from . import worker +from . import workqueue + + +def _process_task(id, cmd, args, callback): + if id is None: # should retry. + pass + elif callback is None: + threading.current_thread().setName(id) + logging.debug("Task: %s running: %s with: %s" % + (id, repr(cmd), repr(args))) + cmd(args) + else: + threading.current_thread().setName(id) + callback(cmd(args)) + + +class ThreadPool(object): + def __init__(self, numThreads, waitTimeout=3, maxTasks=100): + self._work_interval = waitTimeout + self._max_size = maxTasks + self._nworkers = numThreads + self._running = False + self._work_queue = None + self._workers = [] + self.start() + + def start(self): + if not self._running: + self._work_queue = workqueue.WorkQueue( + self._work_interval, self._max_size) + for _ in range(self._nworkers): + worker = worker.Worker(self._work_queue) + worker.start() + self._workers.append(worker) + logging.info('pool started') + self._running = True + + def joinAll(self, waitForThreads=True): + """ + terminate all pooled threads. Thread safe. + """ + self._running = False + + for worker in self._workers: + worker.stop() + if waitForThreads: + worker.join() + del worker + + def queueTask(self, id, task, args=None, taskCallback=None): + if not self._running: + return False + if not callable(task): + return False + + self._work_queue.post(0, + _process_task, + (id, task, args, taskCallback)) + + return True diff --git a/lib/threadpool/examples/demo_threadpool.py b/lib/threadpool/examples/demo_threadpool.py new file mode 100644 index 0000000..18e9fd2 --- /dev/null +++ b/lib/threadpool/examples/demo_threadpool.py @@ -0,0 +1,70 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +from random import randrange + +from compat import ThreadPool + + +# Sample task 1: given a start and end value, shuffle integers, +# then sort them + +def sortTask(data): + print "SortTask starting for ", data + numbers = range(data[0], data[1]) + for a in numbers: + rnd = randrange(0, len(numbers) - 1) + a, numbers[rnd] = numbers[rnd], a + print "SortTask sorting for ", data + numbers.sort() + print "SortTask done for ", data + return "Sorter ", data + +# Sample task 2: just sleep for a number of seconds. + +def waitTask(data): + print "WaitTask starting for ", data + print "WaitTask sleeping for %d seconds" % data + sleep(data) + return "Waiter ", data + +# Both tasks use the same callback + +def taskCallback(data): + print "Callback called for", data + +# Create a pool with three worker threads + +pool = ThreadPool(100) + +# Insert tasks into the queue and let them run +print "Running tasks: ", pool.getRunningTasks(), "\n" +pool.queueTask(sortTask, (1000, 100000), taskCallback) +print "Running tasks: ", pool.getRunningTasks(), "\n" +pool.queueTask(waitTask, 5, taskCallback) +pool.queueTask(sortTask, (200, 200000), taskCallback) +pool.queueTask(waitTask, 2, taskCallback) +print "Running tasks: ", pool.getRunningTasks(), "\n" +pool.queueTask(sortTask, (3, 30000), taskCallback) +pool.queueTask(waitTask, 7, taskCallback) + +# When all tasks are finished, allow the threads to terminate +pool.joinAll() +print "ThreadPool sample done.\n" diff --git a/lib/threadpool/examples/demo.py b/lib/threadpool/examples/demo_watchedpool.py similarity index 100% rename from lib/threadpool/examples/demo.py rename to lib/threadpool/examples/demo_watchedpool.py diff --git a/vdsm.spec.in b/vdsm.spec.in index 4688695..808c3a0 100644 --- a/vdsm.spec.in +++ b/vdsm.spec.in @@ -1438,6 +1438,7 @@ %files python-threadpool %{python_sitelib}/threadpool/__init__.py* +%{python_sitelib}/threadpool/compat.py* %{python_sitelib}/threadpool/futures.py* %{python_sitelib}/threadpool/schedqueue.py* %{python_sitelib}/threadpool/watchedpool.py* -- To view, visit http://gerrit.ovirt.org/29428 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I88f788605b23697f51a5fc6690f902f22272a89e Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
