This new very aptly named (ie. yes, I like the name so much) but
slightly weird object is at the base of the new select+pipe based
conditions. It is basically a pipe coupled with a poller and some usage
counters. It's also my precious. :)

Signed-off-by: Guido Trotter <[email protected]>
---
 lib/locking.py                  |   73 +++++++++++++++++++++++++++++++++++++++
 test/ganeti.locking_unittest.py |   47 +++++++++++++++++++++++++
 2 files changed, 120 insertions(+), 0 deletions(-)

diff --git a/lib/locking.py b/lib/locking.py
index 2f41d47..84d0441 100644
--- a/lib/locking.py
+++ b/lib/locking.py
@@ -23,6 +23,9 @@
 # pylint: disable-msg=W0613,W0201
 
 import threading
+import os
+import select
+
 # Wouldn't it be better to define LockingError in the locking module?
 # Well, for now that's how the rest of the code does it...
 from ganeti import errors
@@ -48,6 +51,76 @@ def ssynchronized(lock, shared=0):
   return wrap
 
 
+class LazyCountingPollingPipe:
+  """Wrapper around a pipe for usage inside conditions.
+
+  This class contains a pipe object, a poller that polls it, and takes care of
+  counting its users. It's lazy because it allocated the pipe only when it
+  actually starts to get used.
+
+  Warning: this class is designed to be used as the underlying component of a
+  locking condition, but is not by itself thread safe, and needs to be
+  protected by the relevant locks.
+
+  @ivar read: read file descriptor (or None, before startUsing is called)
+  @ivar write: write file descriptor (or None, before ststartUsing is called)
+
+  """
+  def __init__(self):
+    """Constructor for the LazyCountingPollingPipe
+
+    """
+    self._poller = None
+    self.read = self.write = None
+    self.users = 0
+
+  def startUsing(self):
+    """Starts using the LazyCountingPollingPipe
+
+    @rtype: polling object (as select.poll())
+    @return: a poller for closure on the write side
+
+    """
+    self.users += 1
+    if self.read is None:
+      (self.read, self.write) = os.pipe()
+      try:
+        self._poller = select.poll()
+        # the poller polls for closure of the write side
+        self._poller.register(self.read, select.POLLHUP)
+      except:
+        os.close(self.read)
+        os.close(self.write)
+        raise
+
+    return self._poller
+
+  def stopUsing(self):
+    """Stops using the LazyCountingPollingPipe
+
+    Decrease the number of users, or if we're the last one close the pipe.
+
+    """
+    self.users -= 1
+    if self.read is not None and self.users == 0:
+      try:
+        os.close(self.read)
+        self.read = None
+      finally:
+        self.closeWrite()
+
+  def closeWrite(self):
+    """Close the write side of the LazyCountingPollingPipe
+
+    This function is idempotent, and won't try to close the file descriptor
+    twice.
+
+    """
+    if self.write is not None:
+      os.close(self.write)
+      self.write = None
+
+
 class SharedLock:
   """Implements a shared lock.
 
diff --git a/test/ganeti.locking_unittest.py b/test/ganeti.locking_unittest.py
index aee6860..2b54c99 100755
--- a/test/ganeti.locking_unittest.py
+++ b/test/ganeti.locking_unittest.py
@@ -67,6 +67,53 @@ class _ThreadedTestCase(unittest.TestCase):
     self.threads = []
 
 
+class TestLazyCountingPollingPipe(unittest.TestCase):
+  """LazyCountingPollingPipe tests"""
+
+  def setUp(self):
+    self.lcpp = locking.LazyCountingPollingPipe()
+
+  def testInitialization(self):
+    self.assertEqual(self.lcpp.read, None)
+    self.assertEqual(self.lcpp.write, None)
+    self.assertEqual(self.lcpp._poller, None)
+    self.assertEqual(self.lcpp.users, 0)
+
+  def testUsageCount(self):
+    self.lcpp.startUsing()
+    self.assertNotEqual(self.lcpp.read, None)
+    self.assertNotEqual(self.lcpp.write, None)
+    self.assertEqual(self.lcpp.users, 1)
+    self.lcpp.startUsing() # use again
+    self.assertEqual(self.lcpp.users, 2)
+    self.lcpp.stopUsing() # there is more than one user
+    self.assertEqual(self.lcpp.users, 1)
+    self.assertNotEqual(self.lcpp.read, None)
+    self.assertNotEqual(self.lcpp.write, None)
+    self.lcpp.stopUsing()
+    self.assertEqual(self.lcpp.users, 0)
+    self.assertEqual(self.lcpp.read, None)
+    self.assertEqual(self.lcpp.write, None)
+
+  def testWriteClose(self):
+    self.lcpp.startUsing()
+    self.lcpp.startUsing()
+    self.lcpp.closeWrite()
+    self.assertEqual(self.lcpp.write, None)
+    self.assertNotEqual(self.lcpp.read, None)
+    self.lcpp.stopUsing()
+    self.assertNotEqual(self.lcpp.read, None)
+    self.lcpp.stopUsing()
+    self.assertEqual(self.lcpp.read, None)
+
+  def testReusage(self):
+    self.lcpp.startUsing()
+    self.lcpp.stopUsing()
+    # redo previous test, but after using it once
+    self.testUsageCount()
+    self.testWriteClose()
+
+
 class TestSharedLock(_ThreadedTestCase):
   """SharedLock tests"""
 
-- 
1.5.6.5

Reply via email to