Yaniv Bronhaim has uploaded a new change for review. Change subject: Adding threads limitation to misc.tmap ......................................................................
Adding threads limitation to misc.tmap Using threads queue to keep the threads order and limit threads count I moved the function code next to itmap for readability. Currently noone calls to tmap function, so I don't care to change and fix it now to avoid future bugs. Exceptions that related to the queue usage will be raised as part of the function's exceptions. Signed-off-by: Yaniv Bronhaim <[email protected]> Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 --- M vdsm/storage/misc.py 1 file changed, 59 insertions(+), 36 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/58/8858/1 diff --git a/vdsm/storage/misc.py b/vdsm/storage/misc.py index 426d181..f1a4581 100644 --- a/vdsm/storage/misc.py +++ b/vdsm/storage/misc.py @@ -1085,42 +1085,6 @@ return tuple(res) -def tmap(func, iterable): - resultsDict = {} - error = [None] - - def wrapper(f, arg, index): - try: - resultsDict[index] = f(arg) - except Exception, e: - # We will throw the last error received - # we can only throw one error, and the - # last one is as good as any. This shouldn't - # happen. Wrapped methods should not throw - # exceptions, if this happens it's a bug - log.error("tmap caught an unexpected error", exc_info=True) - error[0] = e - resultsDict[index] = None - - threads = [] - for i, arg in enumerate(iterable): - t = threading.Thread(target=wrapper, args=(func, arg, i)) - threads.append(t) - t.start() - - for t in threads: - t.join() - - results = [None] * len(resultsDict) - for i, result in resultsDict.iteritems(): - results[i] = result - - if error[0] is not None: - raise error[0] - - return tuple(results) - - def getfds(): return [int(fd) for fd in os.listdir("/proc/self/fd")] @@ -1251,6 +1215,65 @@ raise exception +def tmap(func, iterable, maxthreads=UNLIMITED_THREADS): + """ + Make an iterator that computes the function using arguments from the + iterable by running each operation in a different thread in specific + order. + maxthreads stands for maximum threads that we can initiate simultaneosly. + If we reached to max threads the function waits for thread to + finish before initiate the next one. + """ + if maxthreads < 1 and maxthreads != UNLIMITED_THREADS: + raise ValueError("Wrong input passed to function tmap: %s", maxthreads) + + resultsDict = {} + error = [None] + + def wrapper(f, arg, index): + try: + resultsDict[index] = f(arg) + except Exception, e: + # We will throw the last error received + # we can only throw one error, and the + # last one is as good as any. This shouldn't + # happen. Wrapped methods should not throw + # exceptions, if this happens it's a bug + log.error("tmap caught an unexpected error", exc_info=True) + error[0] = e + resultsDict[index] = None + + if maxthreads != UNLIMITED_THREADS: + threadsQueue = Queue.Queue(maxthreads) + else: + threadsQueue = Queue.Queue() + + for i, arg in enumerate(iterable): + if not threadsQueue.full(): + t = threading.Thread(target=wrapper, args=(func, arg, i)) + threadsQueue.put_nowait(t) + t.start() + else: + # waits for the first unfinished thread in list to finish if we + # have already initiate all possible thread's slots (maxthreads) + if threadsQueue.empty(): + raise RuntimeError("No thread initieated") + else: + threadsQueue.get_nowait().join() + + while not threadsQueue.empty(): + threadsQueue.get(False).join() + + results = [None] * len(resultsDict) + for i, result in resultsDict.iteritems(): + results[i] = result + + if error[0] is not None: + raise error[0] + + return tuple(results) + + def itmap(func, iterable, maxthreads=UNLIMITED_THREADS): """ Make an iterator that computes the function using -- To view, visit http://gerrit.ovirt.org/8858 To unsubscribe, visit http://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I07845bfd78b9215e8994ac2ebe46a7ff78c85625 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Yaniv Bronhaim <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
