On Thu, May 21, 2015 at 04:00:08PM +0200, 'Klaus Aehlig' via ganeti-devel wrote:
...as in a single-threaded process there are no conflicts.

Signed-off-by: Klaus Aehlig <[email protected]>
---
lib/jqueue/__init__.py            | 34 +---------------------------------
test/py/ganeti.jqueue_unittest.py | 37 -------------------------------------
2 files changed, 1 insertion(+), 70 deletions(-)

diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index 0aa9665..286f929 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -30,9 +30,6 @@

"""Module implementing the job queue handling.

-Locking: there's a single, large lock in the L{JobQueue} class. It's
-used by all other classes in this module.
-
"""

import logging
@@ -73,10 +70,6 @@ from ganeti import vcluster
from ganeti.cmdlib import cluster


-# member lock names to be passed to @ssynchronized decorator
-_LOCK = "_lock"
-_QUEUE = "_queue"
-
#: Retrieves "id" attribute
_GetIdAttr = operator.attrgetter("id")

@@ -578,7 +571,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
      logging.debug("Canceling opcode")
      raise CancelJob()

-  @locking.ssynchronized(_QUEUE, shared=1)
  def NotifyStart(self):
    """Mark the opcode as running, not lock-waiting.

@@ -603,7 +595,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
    # And finally replicate the job status
    self._queue.UpdateJobUnlocked(self._job)

-  @locking.ssynchronized(_QUEUE, shared=1)
  def NotifyRetry(self):
    """Mark opcode again as lock-waiting.

@@ -614,7 +605,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
    self._op.status = constants.OP_STATUS_WAITING
    logging.debug("Opcode will be retried. Back to waiting.")

-  @locking.ssynchronized(_QUEUE, shared=1)
  def _AppendFeedback(self, timestamp, log_type, log_msg):
    """Internal feedback append function, with locks

@@ -966,7 +956,6 @@ class _JobProcessor(object):

    logging.debug("Processing job %s", job.id)

-    queue.acquire(shared=1)
    try:
      opcount = len(job.ops)

@@ -1029,11 +1018,7 @@ class _JobProcessor(object):

          assert not opctx.jobdeps, "Not all dependencies were removed"

-          queue.release()
-          try:
-            (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
-          finally:
-            queue.acquire(shared=1)
+          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)

          op.status = op_status
          op.result = op_result
@@ -1138,7 +1123,6 @@ class _JobProcessor(object):
        return self.DEFER
    finally:
      assert job.writable, "Job became read-only while being processed"
-      queue.release()


class _JobDependencyManager:
@@ -1158,9 +1142,7 @@ class _JobDependencyManager:
    self._getstatus_fn = getstatus_fn

    self._waiters = {}
-    self._lock = locking.SharedLock("JobDepMgr")

-  @locking.ssynchronized(_LOCK, shared=1)
  def JobWaiting(self, job):
    """Checks if a job is waiting.

@@ -1168,7 +1150,6 @@ class _JobDependencyManager:
    return compat.any(job in jobs
                      for jobs in self._waiters.values())

-  @locking.ssynchronized(_LOCK)
  def CheckAndRegister(self, job, dep_job_id, dep_status):
    """Checks if a dependency job has the requested status.

@@ -1257,16 +1238,6 @@ class JobQueue(object):
    self._memcache = weakref.WeakValueDictionary()
    self._my_hostname = netutils.Hostname.GetSysName()

-    # The Big JobQueue lock. If a code block or method acquires it in shared
-    # mode safe it must guarantee concurrency with all the code acquiring it in
-    # shared mode, including itself. In order not to acquire it at all
-    # concurrency must be guaranteed with all code acquiring it in shared mode
-    # and all code acquiring it exclusively.
-    self._lock = locking.SharedLock("JobQueue")
-
-    self.acquire = self._lock.acquire
-    self.release = self._lock.release
-
    # Get initial list of nodes
    self._nodes = dict((n.name, n.primary_ip)
                       for n in cfg.GetAllNodesInfo().values()
@@ -1290,7 +1261,6 @@ class JobQueue(object):
    """
    return rpc.JobQueueRunner(self.context, address_list)

-  @locking.ssynchronized(_LOCK)
  def AddNode(self, node):
    """Register a new node with the queue.

@@ -1730,7 +1700,6 @@ class JobQueue(object):
    else:
      return None

-  @locking.ssynchronized(_LOCK)
  def CancelJob(self, job_id):
    """Cancels a job.

@@ -1744,7 +1713,6 @@ class JobQueue(object):

    return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())

-  @locking.ssynchronized(_LOCK)
  def ChangeJobPriority(self, job_id, priority):
    """Changes a job's priority.

diff --git a/test/py/ganeti.jqueue_unittest.py 
b/test/py/ganeti.jqueue_unittest.py
index c13749d..c258f65 100755
--- a/test/py/ganeti.jqueue_unittest.py
+++ b/test/py/ganeti.jqueue_unittest.py
@@ -516,7 +516,6 @@ class _DisabledFakeDependencyManager:

class _FakeQueueForProc:
  def __init__(self, depmgr=None):
-    self._acquired = False
    self._updates = []
    self._submitted = []

@@ -527,29 +526,16 @@ class _FakeQueueForProc:
    else:
      self.depmgr = _DisabledFakeDependencyManager()

-  def IsAcquired(self):
-    return self._acquired
-
  def GetNextUpdate(self):
    return self._updates.pop(0)

  def GetNextSubmittedJob(self):
    return self._submitted.pop(0)

-  def acquire(self, shared=0):
-    assert shared == 1
-    self._acquired = True
-
-  def release(self):
-    assert self._acquired
-    self._acquired = False
-
  def UpdateJobUnlocked(self, job, replicate=True):
-    assert self._acquired, "Lock not acquired while updating job"
    self._updates.append((job, bool(replicate)))

  def SubmitManyJobs(self, jobs):
-    assert not self._acquired, "Lock acquired while submitting jobs"
    job_ids = [self._submit_count.next() for _ in jobs]
    self._submitted.extend(zip(job_ids, jobs))
    return job_ids
@@ -563,8 +549,6 @@ class _FakeExecOpCodeForProc:

  def __call__(self, op, cbs, timeout=None):
    assert isinstance(op, opcodes.OpTestDummy)
-    assert not self._queue.IsAcquired(), \
-           "Queue lock not released when executing opcode"

    if self._before_start:
      self._before_start(timeout, cbs.CurrentPriority())
@@ -574,9 +558,6 @@ class _FakeExecOpCodeForProc:
    if self._after_start:
      self._after_start(op, cbs)

-    # Check again after the callbacks
-    assert not self._queue.IsAcquired()
-
    if op.fail:
      raise errors.OpExecError("Error requested (%s)" % op.result)

@@ -620,7 +601,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
      def _BeforeStart(timeout, priority):
        self.assertEqual(queue.GetNextUpdate(), (job, True))
        self.assertRaises(IndexError, queue.GetNextUpdate)
-        self.assertFalse(queue.IsAcquired())
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
        self.assertFalse(job.cur_opctx)

@@ -628,7 +608,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
        self.assertEqual(queue.GetNextUpdate(), (job, True))
        self.assertRaises(IndexError, queue.GetNextUpdate)

-        self.assertFalse(queue.IsAcquired())
        self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
        self.assertFalse(job.cur_opctx)

@@ -820,7 +799,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
    def _BeforeStart(timeout, priority):
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)

      # Mark as cancelled
@@ -834,7 +812,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
    def _AfterStart(op, cbs):
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)

    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
@@ -865,7 +842,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)

    def _BeforeStart(timeout, priority):
-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)

      # Mark as cancelled
@@ -1049,13 +1025,11 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
    def _BeforeStart(timeout, priority):
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)

    def _AfterStart(op, cbs):
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)

      self.assertRaises(AssertionError, cbs.Feedback,
@@ -1140,7 +1114,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
    def _BeforeStart(timeout, priority):
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
      self.assertFalse(job.cur_opctx)

@@ -1148,7 +1121,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)

-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
      self.assertFalse(job.cur_opctx)

@@ -1217,7 +1189,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
        # Job should only be updated when it wasn't waiting for another job
        self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
      self.assertFalse(job.cur_opctx)

@@ -1225,7 +1196,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)

-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
      self.assertFalse(job.cur_opctx)

@@ -1336,7 +1306,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
        # Job should only be updated when it wasn't waiting for another job
        self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
      self.assertFalse(job.cur_opctx)

@@ -1344,7 +1313,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)

-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
      self.assertFalse(job.cur_opctx)

@@ -1445,7 +1413,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
        # Job should only be updated when it wasn't waiting for another job
        self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)
-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
      self.assertFalse(job.cur_opctx)

@@ -1453,7 +1420,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
      self.assertEqual(queue.GetNextUpdate(), (job, True))
      self.assertRaises(IndexError, queue.GetNextUpdate)

-      self.assertFalse(queue.IsAcquired())
      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
      self.assertFalse(job.cur_opctx)

@@ -1570,7 +1536,6 @@ class TestJobProcessorTimeouts(unittest.TestCase, 
_JobProcessorTestUtils):
      self.assertEqual(self.queue.GetNextUpdate(), (job, True))
    self.assertRaises(IndexError, self.queue.GetNextUpdate)

-    self.assertFalse(self.queue.IsAcquired())
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)

    ts = self.timeout_strategy
@@ -1608,7 +1573,6 @@ class TestJobProcessorTimeouts(unittest.TestCase, 
_JobProcessorTestUtils):
    self.assertEqual(self.queue.GetNextUpdate(), (job, True))
    self.assertRaises(IndexError, self.queue.GetNextUpdate)

-    self.assertFalse(self.queue.IsAcquired())
    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)

    # Job is running, cancelling shouldn't be possible
@@ -1765,7 +1729,6 @@ class TestJobProcessorChangePriority(unittest.TestCase, 
_JobProcessorTestUtils):
    self.opexecprio = []

  def _BeforeStart(self, timeout, priority):
-    self.assertFalse(self.queue.IsAcquired())
    self.opexecprio.append(priority)

  def testChangePriorityWhileRunning(self):
--
2.2.0.rc0.207.ga3a616c


LGTM

Reply via email to