...as in a single-threaded process there are no conflicts.

Signed-off-by: Klaus Aehlig <[email protected]>
---
 lib/jqueue/__init__.py            | 34 +---------------------------------
 test/py/ganeti.jqueue_unittest.py | 37 -------------------------------------
 2 files changed, 1 insertion(+), 70 deletions(-)

diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index 0aa9665..286f929 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -30,9 +30,6 @@
 
 """Module implementing the job queue handling.
 
-Locking: there's a single, large lock in the L{JobQueue} class. It's
-used by all other classes in this module.
-
 """
 
 import logging
@@ -73,10 +70,6 @@ from ganeti import vcluster
 from ganeti.cmdlib import cluster
 
 
-# member lock names to be passed to @ssynchronized decorator
-_LOCK = "_lock"
-_QUEUE = "_queue"
-
 #: Retrieves "id" attribute
 _GetIdAttr = operator.attrgetter("id")
 
@@ -578,7 +571,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
       logging.debug("Canceling opcode")
       raise CancelJob()
 
-  @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
 
@@ -603,7 +595,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     # And finally replicate the job status
     self._queue.UpdateJobUnlocked(self._job)
 
-  @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyRetry(self):
     """Mark opcode again as lock-waiting.
 
@@ -614,7 +605,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     self._op.status = constants.OP_STATUS_WAITING
     logging.debug("Opcode will be retried. Back to waiting.")
 
-  @locking.ssynchronized(_QUEUE, shared=1)
   def _AppendFeedback(self, timestamp, log_type, log_msg):
     """Internal feedback append function, with locks
 
@@ -966,7 +956,6 @@ class _JobProcessor(object):
 
     logging.debug("Processing job %s", job.id)
 
-    queue.acquire(shared=1)
     try:
       opcount = len(job.ops)
 
@@ -1029,11 +1018,7 @@ class _JobProcessor(object):
 
           assert not opctx.jobdeps, "Not all dependencies were removed"
 
-          queue.release()
-          try:
-            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
-          finally:
-            queue.acquire(shared=1)
+          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
 
           op.status = op_status
           op.result = op_result
@@ -1138,7 +1123,6 @@ class _JobProcessor(object):
         return self.DEFER
     finally:
       assert job.writable, "Job became read-only while being processed"
-      queue.release()
 
 
 class _JobDependencyManager:
@@ -1158,9 +1142,7 @@ class _JobDependencyManager:
     self._getstatus_fn = getstatus_fn
 
     self._waiters = {}
-    self._lock = locking.SharedLock("JobDepMgr")
 
-  @locking.ssynchronized(_LOCK, shared=1)
   def JobWaiting(self, job):
     """Checks if a job is waiting.
 
@@ -1168,7 +1150,6 @@ class _JobDependencyManager:
     return compat.any(job in jobs
                       for jobs in self._waiters.values())
 
-  @locking.ssynchronized(_LOCK)
   def CheckAndRegister(self, job, dep_job_id, dep_status):
     """Checks if a dependency job has the requested status.
 
@@ -1257,16 +1238,6 @@ class JobQueue(object):
     self._memcache = weakref.WeakValueDictionary()
     self._my_hostname = netutils.Hostname.GetSysName()
 
-    # The Big JobQueue lock. If a code block or method acquires it in shared
-    # mode safe it must guarantee concurrency with all the code acquiring it in
-    # shared mode, including itself. In order not to acquire it at all
-    # concurrency must be guaranteed with all code acquiring it in shared mode
-    # and all code acquiring it exclusively.
-    self._lock = locking.SharedLock("JobQueue")
-
-    self.acquire = self._lock.acquire
-    self.release = self._lock.release
-
     # Get initial list of nodes
     self._nodes = dict((n.name, n.primary_ip)
                        for n in cfg.GetAllNodesInfo().values()
@@ -1290,7 +1261,6 @@ class JobQueue(object):
     """
     return rpc.JobQueueRunner(self.context, address_list)
 
-  @locking.ssynchronized(_LOCK)
   def AddNode(self, node):
     """Register a new node with the queue.
 
@@ -1730,7 +1700,6 @@ class JobQueue(object):
     else:
       return None
 
-  @locking.ssynchronized(_LOCK)
   def CancelJob(self, job_id):
     """Cancels a job.
 
@@ -1744,7 +1713,6 @@ class JobQueue(object):
 
     return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
 
-  @locking.ssynchronized(_LOCK)
   def ChangeJobPriority(self, job_id, priority):
     """Changes a job's priority.
 
diff --git a/test/py/ganeti.jqueue_unittest.py 
b/test/py/ganeti.jqueue_unittest.py
index c13749d..c258f65 100755
--- a/test/py/ganeti.jqueue_unittest.py
+++ b/test/py/ganeti.jqueue_unittest.py
@@ -516,7 +516,6 @@ class _DisabledFakeDependencyManager:
 
 class _FakeQueueForProc:
   def __init__(self, depmgr=None):
-    self._acquired = False
     self._updates = []
     self._submitted = []
 
@@ -527,29 +526,16 @@ class _FakeQueueForProc:
     else:
       self.depmgr = _DisabledFakeDependencyManager()
 
-  def IsAcquired(self):
-    return self._acquired
-
   def GetNextUpdate(self):
     return self._updates.pop(0)
 
   def GetNextSubmittedJob(self):
     return self._submitted.pop(0)
 
-  def acquire(self, shared=0):
-    assert shared == 1
-    self._acquired = True
-
-  def release(self):
-    assert self._acquired
-    self._acquired = False
-
   def UpdateJobUnlocked(self, job, replicate=True):
-    assert self._acquired, "Lock not acquired while updating job"
     self._updates.append((job, bool(replicate)))
 
   def SubmitManyJobs(self, jobs):
-    assert not self._acquired, "Lock acquired while submitting jobs"
     job_ids = [self._submit_count.next() for _ in jobs]
     self._submitted.extend(zip(job_ids, jobs))
     return job_ids
@@ -563,8 +549,6 @@ class _FakeExecOpCodeForProc:
 
   def __call__(self, op, cbs, timeout=None):
     assert isinstance(op, opcodes.OpTestDummy)
-    assert not self._queue.IsAcquired(), \
-           "Queue lock not released when executing opcode"
 
     if self._before_start:
       self._before_start(timeout, cbs.CurrentPriority())
@@ -574,9 +558,6 @@ class _FakeExecOpCodeForProc:
     if self._after_start:
       self._after_start(op, cbs)
 
-    # Check again after the callbacks
-    assert not self._queue.IsAcquired()
-
     if op.fail:
       raise errors.OpExecError("Error requested (%s)" % op.result)
 
@@ -620,7 +601,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
       def _BeforeStart(timeout, priority):
         self.assertEqual(queue.GetNextUpdate(), (job, True))
         self.assertRaises(IndexError, queue.GetNextUpdate)
-        self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
         self.assertFalse(job.cur_opctx)
 
@@ -628,7 +608,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
         self.assertEqual(queue.GetNextUpdate(), (job, True))
         self.assertRaises(IndexError, queue.GetNextUpdate)
 
-        self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
         self.assertFalse(job.cur_opctx)
 
@@ -820,7 +799,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
     def _BeforeStart(timeout, priority):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
       # Mark as cancelled
@@ -834,7 +812,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
     def _AfterStart(op, cbs):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
 
     opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
@@ -865,7 +842,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
     def _BeforeStart(timeout, priority):
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
       # Mark as cancelled
@@ -1049,13 +1025,11 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
     def _BeforeStart(timeout, priority):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
     def _AfterStart(op, cbs):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
 
       self.assertRaises(AssertionError, cbs.Feedback,
@@ -1140,7 +1114,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
     def _BeforeStart(timeout, priority):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
       self.assertFalse(job.cur_opctx)
 
@@ -1148,7 +1121,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
 
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
       self.assertFalse(job.cur_opctx)
 
@@ -1217,7 +1189,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
         # Job should only be updated when it wasn't waiting for another job
         self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
       self.assertFalse(job.cur_opctx)
 
@@ -1225,7 +1196,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
 
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
       self.assertFalse(job.cur_opctx)
 
@@ -1336,7 +1306,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
         # Job should only be updated when it wasn't waiting for another job
         self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
       self.assertFalse(job.cur_opctx)
 
@@ -1344,7 +1313,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
 
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
       self.assertFalse(job.cur_opctx)
 
@@ -1445,7 +1413,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
         # Job should only be updated when it wasn't waiting for another job
         self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
       self.assertFalse(job.cur_opctx)
 
@@ -1453,7 +1420,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
       self.assertEqual(queue.GetNextUpdate(), (job, True))
       self.assertRaises(IndexError, queue.GetNextUpdate)
 
-      self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
       self.assertFalse(job.cur_opctx)
 
@@ -1570,7 +1536,6 @@ class TestJobProcessorTimeouts(unittest.TestCase, 
_JobProcessorTestUtils):
       self.assertEqual(self.queue.GetNextUpdate(), (job, True))
     self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
-    self.assertFalse(self.queue.IsAcquired())
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
 
     ts = self.timeout_strategy
@@ -1608,7 +1573,6 @@ class TestJobProcessorTimeouts(unittest.TestCase, 
_JobProcessorTestUtils):
     self.assertEqual(self.queue.GetNextUpdate(), (job, True))
     self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
-    self.assertFalse(self.queue.IsAcquired())
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
 
     # Job is running, cancelling shouldn't be possible
@@ -1765,7 +1729,6 @@ class TestJobProcessorChangePriority(unittest.TestCase, 
_JobProcessorTestUtils):
     self.opexecprio = []
 
   def _BeforeStart(self, timeout, priority):
-    self.assertFalse(self.queue.IsAcquired())
     self.opexecprio.append(priority)
 
   def testChangePriorityWhileRunning(self):
-- 
2.2.0.rc0.207.ga3a616c

Reply via email to