Francesco Romani has uploaded a new change for review.

Change subject: threadpool: add concurrent.futures compatibility
......................................................................

threadpool: add concurrent.futures compatibility

Add a python futures compatibility module on top
of the watched thread pool.

Change-Id: Iad373cfc89a5a9bba184bce092dc255e37f97378
Signed-off-by: Francesco Romani <[email protected]>
---
M debian/vdsm-python-threadpool.install
M lib/threadpool/Makefile.am
A lib/threadpool/futures.py
M vdsm.spec.in
4 files changed, 111 insertions(+), 0 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/24/29424/1

diff --git a/debian/vdsm-python-threadpool.install 
b/debian/vdsm-python-threadpool.install
index c53713e..4d510ca 100644
--- a/debian/vdsm-python-threadpool.install
+++ b/debian/vdsm-python-threadpool.install
@@ -1,4 +1,5 @@
 ./usr/lib/python2.7/dist-packages/threadpool/__init__.py
+./usr/lib/python2.7/dist-packages/threadpool/futures.py
 ./usr/lib/python2.7/dist-packages/threadpool/schedqueue.py
 ./usr/lib/python2.7/dist-packages/threadpool/watchedpool.py
 ./usr/lib/python2.7/dist-packages/threadpool/watchman.py
diff --git a/lib/threadpool/Makefile.am b/lib/threadpool/Makefile.am
index 8b7098e..106d90f 100644
--- a/lib/threadpool/Makefile.am
+++ b/lib/threadpool/Makefile.am
@@ -22,6 +22,7 @@
 
 dist_threadpool_PYTHON = \
        __init__.py \
+       futures.py \
        schedqueue.py \
        watchedpool.py \
        watchman.py \
diff --git a/lib/threadpool/futures.py b/lib/threadpool/futures.py
new file mode 100644
index 0000000..15b0ace
--- /dev/null
+++ b/lib/threadpool/futures.py
@@ -0,0 +1,108 @@
+#
+# Copyright 2014 Red Hat, Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+#
+# Refer to the README and COPYING files for full details of the license
+#
+
+"""
+concurrent.futures compatibility API.
+"""
+
+import sys
+
+from concurrent.futures import Future
+
+from .watchedpool import WatchedThreadPool
+
+
+class WatchedThreadPoolExecutor(object):
+    """
+    Provides an Executor compatible with the concurrent.futures API
+    but built on top of a WatchedThreadPool.
+    """
+    def __init__(self, max_workers):
+        self._pool = WatchedThreadPool(max_workers)
+        self._pool.start()
+
+    def submit(self, func, *args, **kwargs):
+        """
+        Schedules the callable, func, to be executed as func(*args **kwargs)
+        and returns a Future object representing the execution of the
+        callable.
+
+        >>> with ThreadPoolExecutor(max_workers=1) as executor:
+        >>>     future = executor.submit(pow, 323, 1235)
+        >>>     print(future.result())
+        """
+        future = Future()
+
+        def _wrapper(*args, **kwargs):
+            if not future.set_running_or_notify_cancel():
+                return
+
+            try:
+                result = func(*args, **kwargs)
+            except BaseException:
+                exc = sys.exc_info()[1]
+                future.set_exception(exc)
+            else:
+                future.set_result(result)
+
+        tag = self._pool.submit_periodic(0, _wrapper, *args, **kwargs)
+        if tag is None:
+            raise RuntimeError('pool not started')
+
+        return future
+
+    def map(self, func, *iterables, **kwargs):
+        # timeout = kwargs.get('timeout')
+        raise NotImplementedError
+
+    def shutdown(self, wait=True):
+        """
+        Signal the executor that it should free any resources that it is using
+        when the currently pending futures are done executing.
+        Calls to Executor.submit() and Executor.map() made after shutdown will
+        raise RuntimeError.
+
+        If wait is True then this method will not return until all the pending
+        futures are done executing and the resources associated with the
+        executor have been freed. If wait is False then this method will return
+        immediately and the resources associated with the executor will be
+        freed when all pending futures are done executing.
+        Regardless of the value of wait, the entire Python program will not
+        exit until all pending futures are done executing.
+
+        You can avoid having to call this method explicitly if you use the
+        with statement, which will shutdown the Executor
+        (waiting as if Executor.shutdown() were called with wait set to True):
+
+        >>> import shutil
+        >>> with ThreadPoolExecutor(max_workers=4) as e:
+        >>>     e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
+        >>>     e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
+        >>>     e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
+        >>>     e.submit(shutil.copy, 'src3.txt', 'dest4.txt')
+        """
+        return self._pool.stop(wait)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_value, traceback):
+        self.shutdown(wait=True)
+        return False
diff --git a/vdsm.spec.in b/vdsm.spec.in
index 198e7e8..4688695 100644
--- a/vdsm.spec.in
+++ b/vdsm.spec.in
@@ -1438,6 +1438,7 @@
 
 %files python-threadpool
 %{python_sitelib}/threadpool/__init__.py*
+%{python_sitelib}/threadpool/futures.py*
 %{python_sitelib}/threadpool/schedqueue.py*
 %{python_sitelib}/threadpool/watchedpool.py*
 %{python_sitelib}/threadpool/watchman.py*


-- 
To view, visit http://gerrit.ovirt.org/29424
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: Iad373cfc89a5a9bba184bce092dc255e37f97378
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Francesco Romani <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to