Nir Soffer has uploaded a new change for review. Change subject: concurrent: Add CountedEvent class ......................................................................
concurrent: Add CountedEvent class CountedEvent is like a threading.Event, but it notifies waiters only after it was set multiple times. CountedEvent is useful for waiting until multiple threads are ready. Currently we are synchronizing multiple threads in the tests using timeouts, which make the tests slower and break randomly on overloaded CI slaves. Change-Id: I68525e18f6b3774d7e10af1226a7bc3404c68ae9 Signed-off-by: Nir Soffer <nsof...@redhat.com> --- M lib/vdsm/concurrent.py M tests/concurrentTests.py 2 files changed, 79 insertions(+), 0 deletions(-) git pull ssh://gerrit.ovirt.org:29418/vdsm refs/changes/27/42927/1 diff --git a/lib/vdsm/concurrent.py b/lib/vdsm/concurrent.py index 3865873..20d0e09 100644 --- a/lib/vdsm/concurrent.py +++ b/lib/vdsm/concurrent.py @@ -21,6 +21,51 @@ from __future__ import absolute_import import threading from collections import namedtuple +from . import utils + + +class Timeout(Exception): + """ Raised when operation timedout """ + + +class CountedEvent(object): + """ + An Event like object that notify waiters only after it was set multiple + times. + + Typical usage: + + ready = CountedEvent(5) + + def thread(): + ready.set() + + for i in range(5): + threading.Thread(target=thread).start() + + # Will block until all 5 threads invoke ready.set() + ready.wait() + + """ + + def __init__(self, count): + self._count = count + self._cond = threading.Condition(threading.Lock()) + + def set(self): + with self._cond: + self._count -= 1 + if self._count <= 0: + self._cond.notify_all() + + def wait(self, timeout): + deadline = utils.monotonic_time() + timeout + with self._cond: + while self._count > 0: + now = utils.monotonic_time() + if now >= deadline: + raise Timeout("Timeout waiting for counted event") + self._cond.wait(deadline - now) Result = namedtuple("Result", ["succeeded", "value"]) diff --git a/tests/concurrentTests.py b/tests/concurrentTests.py index 307e397..6bd6496 100644 --- a/tests/concurrentTests.py +++ b/tests/concurrentTests.py @@ -20,12 +20,46 @@ import time import random +import threading from testlib import VdsmTestCase +from testValidation import slowtest from vdsm import concurrent +class CountedEventTests(VdsmTestCase): + + def test_wait_until_set_multiple_times(self): + event = concurrent.CountedEvent(2) + self.assertRaises(concurrent.Timeout, event.wait, 0.0) + event.set() + self.assertRaises(concurrent.Timeout, event.wait, 0.0) + event.set() + event.wait(0) + + @slowtest + def test_timeout(self): + event = concurrent.CountedEvent(1) + with self.assertElapsed(1.0, tolerance=0.5): + self.assertRaises(concurrent.Timeout, event.wait, 1.0) + + def test_wait_for_multiple_threads(self): + count = 5 + event = concurrent.CountedEvent(count) + threads = [] + try: + for i in range(count): + t = threading.Thread(target=event.set) + t.daemon = True + t.start() + threads.append(t) + event.wait(2) + finally: + for t in threads: + t.join() + + class TMapTests(VdsmTestCase): def test_results(self): -- To view, visit https://gerrit.ovirt.org/42927 To unsubscribe, visit https://gerrit.ovirt.org/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I68525e18f6b3774d7e10af1226a7bc3404c68ae9 Gerrit-PatchSet: 1 Gerrit-Project: vdsm Gerrit-Branch: master Gerrit-Owner: Nir Soffer <nsof...@redhat.com> _______________________________________________ vdsm-patches mailing list vdsm-patches@lists.fedorahosted.org https://lists.fedorahosted.org/mailman/listinfo/vdsm-patches