CountingGroupCondition, is a condition that is implemented using pipes and select, which allows timeouts to be handled properly without busy wait. Differently from python conditions, to save file descriptors and an internal queue, it can only be used to notify all waiters: more than one condition, and an external queue of conditions, is needed to wake them up one at a time. Since this is the usage envisioned in Ganeti, we're happy with this.
Signed-off-by: Guido Trotter <[email protected]> --- lib/locking.py | 100 ++++++++++++++++++++++++++++++++++++++- test/ganeti.locking_unittest.py | 88 ++++++++++++++++++++++++++++++++++ 2 files changed, 187 insertions(+), 1 deletions(-) diff --git a/lib/locking.py b/lib/locking.py index 84d0441..2bf59ee 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -25,6 +25,7 @@ import threading import os import select +import time # Wouldn't it be better to define LockingError in the locking module? # Well, for now that's how the rest of the code does it... @@ -121,6 +122,103 @@ class LazyCountingPollingPipe: self.write = None +class CountingGroupCondition: + """Group-only non-polling condition with counters. + + This condition class uses pipes and poll, internally, to be able to wait for + notification with a timeout, without resorting to polling. + + It is almost compatible with a python Condition, but only supports notifyAll. + + As an additional features it's able to report the number of waiting threads. + + Some of the code is copied from python's threading.Condition. + + """ + def __init__(self, lock=None): + if lock is None: + lock = threading.Lock() + self.__lock = lock + # Export the lock's acquire() and release() methods + self.acquire = lock.acquire + self.release = lock.release + # If the lock defines _release_save() and/or _acquire_restore(), + # these override the default implementations (which just call + # release() and acquire() on the lock). Ditto for _is_owned(). + try: + self._release_save = lock._release_save + except AttributeError: + pass + try: + self._acquire_restore = lock._acquire_restore + except AttributeError: + pass + try: + self._is_owned = lock._is_owned + except AttributeError: + pass + self.__nwaiters = 0 + self.__pipe = LazyCountingPollingPipe() + + def _release_save(self): + self.__lock.release() # No state to save + + def _acquire_restore(self, x): + self.__lock.acquire() # Ignore saved state + + def __repr__(self): + return "<GroupCondition(%s, %d)>" % (self.__lock, self.__nwaiters) + + def _is_owned(self): + # Return True if lock is owned by currentThread. + # This method is called only if __lock doesn't have _is_owned(). + if self.__lock.acquire(0): + self.__lock.release() + return False + else: + return True + + def wait(self, timeout=None): + if not self._is_owned(): + raise RuntimeError("cannot wait on un-aquired lock") + self.__nwaiters += 1 + try: + my_pipe = self.__pipe + poller = my_pipe.startUsing() + try: + saved_state = self._release_save() + try: + start_time = time.time() + remaining_time = timeout + while timeout is None or remaining_time > 0: + try: + result = poller.poll(remaining_time) + except EnvironmentError, err: + if err.errno != errno.EINTR: + raise + else: + if result and result[0][0] == my_pipe.read: + break + if timeout is not None: + remaining_time = start_time - time.time() + timeout + finally: + self._acquire_restore(saved_state) + finally: + my_pipe.stopUsing() + finally: + self.__nwaiters -= 1 + + def notifyAll(self): + if not self._is_owned(): + raise RuntimeError("cannot notify on un-aquired lock") + if self.__pipe is not None: + self.__pipe.closeWrite() + self.__pipe = LazyCountingPollingPipe() + + def waiters(self): + return self.__nwaiters + + class SharedLock: """Implements a shared lock. @@ -137,7 +235,7 @@ class SharedLock: """Construct a new SharedLock""" # we have two conditions, c_shr and c_exc, sharing the same lock. self.__lock = threading.Lock() - self.__turn_shr = threading.Condition(self.__lock) + self.__turn_shr = CountingGroupCondition(self.__lock) self.__turn_exc = threading.Condition(self.__lock) # current lock holders diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index 2b54c99..0866f4d 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -114,6 +114,94 @@ class TestLazyCountingPollingPipe(unittest.TestCase): self.testWriteClose() +class TestCountingGroupCondition(_ThreadedTestCase): + """SharedLock tests""" + + def setUp(self): + _ThreadedTestCase.setUp(self) + self.cgc = locking.CountingGroupCondition() + self.done = Queue.Queue(0) + + # helper functions: called in a separate thread they acquire the lock, wait + # their identifier on the done queue when done. + def _doItWaiter(self): + self.cgc.acquire() + self.done.put('A') + self.cgc.wait() + self.cgc.release() + self.done.put('W') + + def _doItZeroTimer(self): + self.cgc.acquire() + self.cgc.wait(0) + self.cgc.release() + self.done.put('T0') + + def _doItSomeTimer(self): + self.cgc.acquire() + self.cgc.wait(0.1) + self.cgc.release() + self.done.put('T1') + + def testAcquireRelease(self): + self.assertRaises(RuntimeError, self.cgc.wait) + self.assertRaises(RuntimeError, self.cgc.notifyAll) + self.cgc.acquire() + self.cgc.notifyAll() + self.cgc.release() + self.assertRaises(RuntimeError, self.cgc.wait) + self.assertRaises(RuntimeError, self.cgc.notifyAll) + + def testWaiters(self): + self._addThread(target=self._doItWaiter) + self._addThread(target=self._doItWaiter) + self._addThread(target=self._doItWaiter) + self.assertEqual(self.done.get(True, 1), 'A') + self.assertEqual(self.done.get(True, 1), 'A') + self.assertEqual(self.done.get(True, 1), 'A') + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.cgc.acquire() + self.assertEqual(self.cgc.waiters(), 3) + # This new thread can't acquire the lock, and thus call wait, before we + # release it + self._addThread(target=self._doItWaiter) + self.cgc.notifyAll() + self.assertRaises(Queue.Empty, self.done.get_nowait) + self.cgc.release() + # We should now get 3 W and 1 A (for the new thread) in whatever order + w = 0 + a = 0 + for i in range(4): + got = self.done.get(True, 1) + if got == 'W': + w += 1 + elif got == 'A': + a += 1 + else: + self.assert_(False, "Got %s on the done queue" % got) + self.assertEqual(w, 3) + self.assertEqual(a, 1) + self.cgc.acquire() + self.cgc.notifyAll() + self.cgc.release() + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'W') + + def testTimeoutWaiters(self): + self._addThread(target=self._doItZeroTimer) + self._addThread(target=self._doItZeroTimer) + self._addThread(target=self._doItZeroTimer) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'T0') + self.assertEqual(self.done.get_nowait(), 'T0') + self.assertEqual(self.done.get_nowait(), 'T0') + self._addThread(target=self._doItSomeTimer) + self._addThread(target=self._doItSomeTimer) + self._waitThreads() + self.assertEqual(self.done.get_nowait(), 'T1') + self.assertEqual(self.done.get_nowait(), 'T1') + + class TestSharedLock(_ThreadedTestCase): """SharedLock tests""" -- 1.5.6.5
