Yaniv Bronhaim has uploaded a new change for review.

Change subject: Adding threads limitation to misc.tmap
......................................................................

Adding threads limitation to misc.tmap

Using threads queue to keep the threads order and limit threads count

I moved the function code next to itmap for readability.
Currently noone calls to tmap function, so I don't care to change and
fix it now to avoid future bugs.

Exceptions that related to the queue usage will be raised as part of
the function's exceptions.

Signed-off-by: Yaniv Bronhaim <[email protected]>
Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625
---
M vdsm/storage/misc.py
1 file changed, 59 insertions(+), 36 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/58/8858/1

diff --git a/vdsm/storage/misc.py b/vdsm/storage/misc.py
index 426d181..f1a4581 100644
--- a/vdsm/storage/misc.py
+++ b/vdsm/storage/misc.py
@@ -1085,42 +1085,6 @@
         return tuple(res)
 
 
-def tmap(func, iterable):
-    resultsDict = {}
-    error = [None]
-
-    def wrapper(f, arg, index):
-        try:
-            resultsDict[index] = f(arg)
-        except Exception, e:
-            # We will throw the last error received
-            # we can only throw one error, and the
-            # last one is as good as any. This shouldn't
-            # happen. Wrapped methods should not throw
-            # exceptions, if this happens it's a bug
-            log.error("tmap caught an unexpected error", exc_info=True)
-            error[0] = e
-            resultsDict[index] = None
-
-    threads = []
-    for i, arg in enumerate(iterable):
-        t = threading.Thread(target=wrapper, args=(func, arg, i))
-        threads.append(t)
-        t.start()
-
-    for t in threads:
-        t.join()
-
-    results = [None] * len(resultsDict)
-    for i, result in resultsDict.iteritems():
-        results[i] = result
-
-    if error[0] is not None:
-        raise error[0]
-
-    return tuple(results)
-
-
 def getfds():
     return [int(fd) for fd in os.listdir("/proc/self/fd")]
 
@@ -1251,6 +1215,65 @@
         raise exception
 
 
+def tmap(func, iterable, maxthreads=UNLIMITED_THREADS):
+    """
+    Make an iterator that computes the function using arguments from the
+    iterable by running each operation in a different thread in specific
+    order.
+    maxthreads stands for maximum threads that we can initiate simultaneosly.
+               If we reached to max threads the function waits for thread to
+               finish before initiate the next one.
+    """
+    if maxthreads < 1 and maxthreads != UNLIMITED_THREADS:
+        raise ValueError("Wrong input passed to function tmap: %s", maxthreads)
+
+    resultsDict = {}
+    error = [None]
+
+    def wrapper(f, arg, index):
+        try:
+            resultsDict[index] = f(arg)
+        except Exception, e:
+            # We will throw the last error received
+            # we can only throw one error, and the
+            # last one is as good as any. This shouldn't
+            # happen. Wrapped methods should not throw
+            # exceptions, if this happens it's a bug
+            log.error("tmap caught an unexpected error", exc_info=True)
+            error[0] = e
+            resultsDict[index] = None
+
+    if maxthreads != UNLIMITED_THREADS:
+        threadsQueue = Queue.Queue(maxthreads)
+    else:
+        threadsQueue = Queue.Queue()
+
+    for i, arg in enumerate(iterable):
+        if not threadsQueue.full():
+            t = threading.Thread(target=wrapper, args=(func, arg, i))
+            threadsQueue.put_nowait(t)
+            t.start()
+        else:
+            # waits for the first unfinished thread in list to finish if we
+            # have already initiate all possible thread's slots (maxthreads)
+            if threadsQueue.empty():
+                raise RuntimeError("No thread initieated")
+            else:
+                threadsQueue.get_nowait().join()
+
+    while not threadsQueue.empty():
+        threadsQueue.get(False).join()
+
+    results = [None] * len(resultsDict)
+    for i, result in resultsDict.iteritems():
+        results[i] = result
+
+    if error[0] is not None:
+        raise error[0]
+
+    return tuple(results)
+
+
 def itmap(func, iterable, maxthreads=UNLIMITED_THREADS):
     """
     Make an iterator that computes the function using


--
To view, visit http://gerrit.ovirt.org/8858
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625
Gerrit-PatchSet: 1
Gerrit-Project: vdsm
Gerrit-Branch: master
Gerrit-Owner: Yaniv Bronhaim <[email protected]>
_______________________________________________
vdsm-patches mailing list
[email protected]
https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches

Reply via email to