Francesco Romani has uploaded a new change for review. Change subject: threadpool: add concurrent.futures compatibility ......................................................................
threadpool: add concurrent.futures compatibility Add a python futures compatibility module on top of the watched thread pool. Change-Id: Iad373cfc89a5a9bba184bce092dc255e37f97378 Signed-off-by: Francesco Romani <[email protected]> --- M debian/vdsm-python-threadpool.install M lib/threadpool/Makefile.am A lib/threadpool/futures.py M vdsm.spec.in 4 files changed, 111 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/24/29424/1 diff --git a/debian/vdsm-python-threadpool.install b/debian/vdsm-python-threadpool.install index c53713e..4d510ca 100644 --- a/debian/vdsm-python-threadpool.install +++ b/debian/vdsm-python-threadpool.install @@ -1,4 +1,5 @@ ./usr/lib/python2.7/dist-packages/threadpool/__init__.py +./usr/lib/python2.7/dist-packages/threadpool/futures.py ./usr/lib/python2.7/dist-packages/threadpool/schedqueue.py ./usr/lib/python2.7/dist-packages/threadpool/watchedpool.py ./usr/lib/python2.7/dist-packages/threadpool/watchman.py diff --git a/lib/threadpool/Makefile.am b/lib/threadpool/Makefile.am index 8b7098e..106d90f 100644 --- a/lib/threadpool/Makefile.am +++ b/lib/threadpool/Makefile.am @@ -22,6 +22,7 @@ dist_threadpool_PYTHON = \ __init__.py \ + futures.py \ schedqueue.py \ watchedpool.py \ watchman.py \ diff --git a/lib/threadpool/futures.py b/lib/threadpool/futures.py new file mode 100644 index 0000000..15b0ace --- /dev/null +++ b/lib/threadpool/futures.py @@ -0,0 +1,108 @@ +# +# Copyright 2014 Red Hat, Inc. +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA +# +# Refer to the README and COPYING files for full details of the license +# + +""" +concurrent.futures compatibility API. +""" + +import sys + +from concurrent.futures import Future + +from .watchedpool import WatchedThreadPool + + +class WatchedThreadPoolExecutor(object): + """ + Provides an Executor compatible with the concurrent.futures API + but built on top of a WatchedThreadPool. + """ + def __init__(self, max_workers): + self._pool = WatchedThreadPool(max_workers) + self._pool.start() + + def submit(self, func, *args, **kwargs): + """ + Schedules the callable, func, to be executed as func(*args **kwargs) + and returns a Future object representing the execution of the + callable. + + >>> with ThreadPoolExecutor(max_workers=1) as executor: + >>> future = executor.submit(pow, 323, 1235) + >>> print(future.result()) + """ + future = Future() + + def _wrapper(*args, **kwargs): + if not future.set_running_or_notify_cancel(): + return + + try: + result = func(*args, **kwargs) + except BaseException: + exc = sys.exc_info()[1] + future.set_exception(exc) + else: + future.set_result(result) + + tag = self._pool.submit_periodic(0, _wrapper, *args, **kwargs) + if tag is None: + raise RuntimeError('pool not started') + + return future + + def map(self, func, *iterables, **kwargs): + # timeout = kwargs.get('timeout') + raise NotImplementedError + + def shutdown(self, wait=True): + """ + Signal the executor that it should free any resources that it is using + when the currently pending futures are done executing. + Calls to Executor.submit() and Executor.map() made after shutdown will + raise RuntimeError. + + If wait is True then this method will not return until all the pending + futures are done executing and the resources associated with the + executor have been freed. If wait is False then this method will return + immediately and the resources associated with the executor will be + freed when all pending futures are done executing. + Regardless of the value of wait, the entire Python program will not + exit until all pending futures are done executing. + + You can avoid having to call this method explicitly if you use the + with statement, which will shutdown the Executor + (waiting as if Executor.shutdown() were called with wait set to True): + + >>> import shutil + >>> with ThreadPoolExecutor(max_workers=4) as e: + >>> e.submit(shutil.copy, 'src1.txt', 'dest1.txt') + >>> e.submit(shutil.copy, 'src2.txt', 'dest2.txt') + >>> e.submit(shutil.copy, 'src3.txt', 'dest3.txt') + >>> e.submit(shutil.copy, 'src3.txt', 'dest4.txt') + """ + return self._pool.stop(wait) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.shutdown(wait=True) + return False diff --git a/vdsm.spec.in b/vdsm.spec.in index 198e7e8..4688695 100644 --- a/vdsm.spec.in +++ b/vdsm.spec.in @@ -1438,6 +1438,7 @@ %files python-threadpool %{python_sitelib}/threadpool/__init__.py* +%{python_sitelib}/threadpool/futures.py* %{python_sitelib}/threadpool/schedqueue.py* %{python_sitelib}/threadpool/watchedpool.py* %{python_sitelib}/threadpool/watchman.py* -- To view, visit http://gerrit.ovirt.org/29424 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Iad373cfc89a5a9bba184bce092dc255e37f97378 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Francesco Romani <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
