This new very aptly named (ie. yes, I like the name so much) but slightly weird object is at the base of the new select+pipe based conditions. It is basically a pipe coupled with a poller and some usage counters. It's also my precious. :)
Signed-off-by: Guido Trotter <[email protected]> --- lib/locking.py | 73 +++++++++++++++++++++++++++++++++++++++ test/ganeti.locking_unittest.py | 47 +++++++++++++++++++++++++ 2 files changed, 120 insertions(+), 0 deletions(-) diff --git a/lib/locking.py b/lib/locking.py index 2f41d47..84d0441 100644 --- a/lib/locking.py +++ b/lib/locking.py @@ -23,6 +23,9 @@ # pylint: disable-msg=W0613,W0201 import threading +import os +import select + # Wouldn't it be better to define LockingError in the locking module? # Well, for now that's how the rest of the code does it... from ganeti import errors @@ -48,6 +51,76 @@ def ssynchronized(lock, shared=0): return wrap +class LazyCountingPollingPipe: + """Wrapper around a pipe for usage inside conditions. + + This class contains a pipe object, a poller that polls it, and takes care of + counting its users. It's lazy because it allocated the pipe only when it + actually starts to get used. + + Warning: this class is designed to be used as the underlying component of a + locking condition, but is not by itself thread safe, and needs to be + protected by the relevant locks. + + @ivar read: read file descriptor (or None, before startUsing is called) + @ivar write: write file descriptor (or None, before ststartUsing is called) + + """ + def __init__(self): + """Constructor for the LazyCountingPollingPipe + + """ + self._poller = None + self.read = self.write = None + self.users = 0 + + def startUsing(self): + """Starts using the LazyCountingPollingPipe + + @rtype: polling object (as select.poll()) + @return: a poller for closure on the write side + + """ + self.users += 1 + if self.read is None: + (self.read, self.write) = os.pipe() + try: + self._poller = select.poll() + # the poller polls for closure of the write side + self._poller.register(self.read, select.POLLHUP) + except: + os.close(self.read) + os.close(self.write) + raise + + return self._poller + + def stopUsing(self): + """Stops using the LazyCountingPollingPipe + + Decrease the number of users, or if we're the last one close the pipe. + + """ + self.users -= 1 + if self.read is not None and self.users == 0: + try: + os.close(self.read) + self.read = None + finally: + self.closeWrite() + + def closeWrite(self): + """Close the write side of the LazyCountingPollingPipe + + This function is idempotent, and won't try to close the file descriptor + twice. + + """ + if self.write is not None: + os.close(self.write) + self.write = None + + class SharedLock: """Implements a shared lock. diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py index aee6860..2b54c99 100755 --- a/test/ganeti.locking_unittest.py +++ b/test/ganeti.locking_unittest.py @@ -67,6 +67,53 @@ class _ThreadedTestCase(unittest.TestCase): self.threads = [] +class TestLazyCountingPollingPipe(unittest.TestCase): + """LazyCountingPollingPipe tests""" + + def setUp(self): + self.lcpp = locking.LazyCountingPollingPipe() + + def testInitialization(self): + self.assertEqual(self.lcpp.read, None) + self.assertEqual(self.lcpp.write, None) + self.assertEqual(self.lcpp._poller, None) + self.assertEqual(self.lcpp.users, 0) + + def testUsageCount(self): + self.lcpp.startUsing() + self.assertNotEqual(self.lcpp.read, None) + self.assertNotEqual(self.lcpp.write, None) + self.assertEqual(self.lcpp.users, 1) + self.lcpp.startUsing() # use again + self.assertEqual(self.lcpp.users, 2) + self.lcpp.stopUsing() # there is more than one user + self.assertEqual(self.lcpp.users, 1) + self.assertNotEqual(self.lcpp.read, None) + self.assertNotEqual(self.lcpp.write, None) + self.lcpp.stopUsing() + self.assertEqual(self.lcpp.users, 0) + self.assertEqual(self.lcpp.read, None) + self.assertEqual(self.lcpp.write, None) + + def testWriteClose(self): + self.lcpp.startUsing() + self.lcpp.startUsing() + self.lcpp.closeWrite() + self.assertEqual(self.lcpp.write, None) + self.assertNotEqual(self.lcpp.read, None) + self.lcpp.stopUsing() + self.assertNotEqual(self.lcpp.read, None) + self.lcpp.stopUsing() + self.assertEqual(self.lcpp.read, None) + + def testReusage(self): + self.lcpp.startUsing() + self.lcpp.stopUsing() + # redo previous test, but after using it once + self.testUsageCount() + self.testWriteClose() + + class TestSharedLock(_ThreadedTestCase): """SharedLock tests""" -- 1.5.6.5
