Nir Soffer has uploaded a new change for review. Change subject: lib: Simplify and generalize concurrent.tmap() ......................................................................
lib: Simplify and generalize concurrent.tmap() The previous version had few issues: - Used non-daemon threads, so the caller application may be blocked on shutdown if a mapped function get stuck. - Assumed that mapped functions do not raise, limiting the usefulness of this utility. - Logged exceptions for functions that do raise - libraries should not log behind you back. - Drop all errors but the last one, so the caller cannot handle them. - Assume that all errors are equal, and raise the last error. This version fixes these issues by making tmap() simpler and more generic. The function is mapped to the values, and the result is a 2 namedtuple (succeeded, value), holding either the result of the function, or the exception raised by the function. The caller is responsible now for handling the errors, and logging them if needed. Change-Id: I4337db65fe1b326157507b1424a8b4924b91210f Signed-off-by: Nir Soffer <[email protected]> --- M lib/vdsm/concurrent.py M tests/concurrentTests.py 2 files changed, 24 insertions(+), 29 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/65/38465/1 diff --git a/lib/vdsm/concurrent.py b/lib/vdsm/concurrent.py index f4886b1..64e072d 100644 --- a/lib/vdsm/concurrent.py +++ b/lib/vdsm/concurrent.py @@ -18,43 +18,31 @@ # Refer to the README and COPYING files for full details of the license # -import logging import threading +from collections import namedtuple -log = logging.getLogger("vds.concurrent") + +Result = namedtuple("Result", ["succeeded", "value"]) def tmap(func, iterable): - resultsDict = {} - error = [None] + args = list(iterable) + results = [None] * len(args) - def wrapper(f, arg, index): + def worker(i, f, arg): try: - resultsDict[index] = f(arg) + results[i] = Result(True, f(arg)) except Exception as e: - # We will throw the last error received - # we can only throw one error, and the - # last one is as good as any. This shouldn't - # happen. Wrapped methods should not throw - # exceptions, if this happens it's a bug - log.error("tmap caught an unexpected error", exc_info=True) - error[0] = e - resultsDict[index] = None + results[i] = Result(False, e) threads = [] - for i, arg in enumerate(iterable): - t = threading.Thread(target=wrapper, args=(func, arg, i)) - threads.append(t) + for i, arg in enumerate(args): + t = threading.Thread(target=worker, args=(i, func, arg)) + t.daemon = True t.start() + threads.append(t) for t in threads: t.join() - results = [None] * len(resultsDict) - for i, result in resultsDict.iteritems(): - results[i] = result - - if error[0] is not None: - raise error[0] - - return tuple(results) + return results diff --git a/tests/concurrentTests.py b/tests/concurrentTests.py index 46ad44c..307e397 100644 --- a/tests/concurrentTests.py +++ b/tests/concurrentTests.py @@ -31,7 +31,8 @@ def test_results(self): values = tuple(range(10)) results = concurrent.tmap(lambda x: x, values) - self.assertEqual(results, values) + expected = [concurrent.Result(True, x) for x in values] + self.assertEqual(results, expected) def test_results_order(self): def func(x): @@ -39,7 +40,8 @@ return x values = tuple(random.random() * 0.1 for x in range(10)) results = concurrent.tmap(func, values) - self.assertEqual(results, values) + expected = [concurrent.Result(True, x) for x in values] + self.assertEqual(results, expected) def test_concurrency(self): start = time.time() @@ -48,6 +50,11 @@ self.assertTrue(0.1 < elapsed < 0.2) def test_error(self): + error = RuntimeError("No result for you!") + def func(x): - raise RuntimeError() - self.assertRaises(RuntimeError, concurrent.tmap, func, range(10)) + raise error + + results = concurrent.tmap(func, range(10)) + expected = [concurrent.Result(False, error)] * 10 + self.assertEqual(results, expected) -- To view, visit https://gerrit.ovirt.org/38465 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I4337db65fe1b326157507b1424a8b4924b91210f Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Nir Soffer <[email protected]> _______________________________________________ vdsm-patches mailing list [email protected] https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches
