As already noted in the design document, an opcode's priority is
increased when the lock(s) can't be acquired within a certain amount of
time, except at the highest priority, where in such a case a blocking
acquire is used.

A unittest is provided. Priorities are not yet used for acquiring the
lock(s)—this will need further changes on mcpu.

Signed-off-by: Michael Hanselmann <[email protected]>
---
 daemons/ganeti-masterd         |    3 +
 lib/jqueue.py                  |  212 +++++++++++++++++++++++++++++++---------
 test/ganeti.jqueue_unittest.py |  201 ++++++++++++++++++++++++++++++++++++-
 3 files changed, 362 insertions(+), 54 deletions(-)

diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
index d627c47..8c32504 100755
--- a/daemons/ganeti-masterd
+++ b/daemons/ganeti-masterd
@@ -314,6 +314,9 @@ class ClientOps:
     """
     # Queries don't have a job id
     proc = mcpu.Processor(self.server.context, None)
+
+    # TODO: Executing an opcode using locks will acquire them in blocking mode.
+    # Consider using a timeout for retries.
     return proc.ExecOpCode(op, None)
 
 
diff --git a/lib/jqueue.py b/lib/jqueue.py
index c7fd4c7..7411f46 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -176,7 +176,7 @@ class _QueuedJob(object):
 
   """
   # pylint: disable-msg=W0212
-  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter",
+  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
                "received_timestamp", "start_timestamp", "end_timestamp",
                "__weakref__"]
 
@@ -211,6 +211,7 @@ class _QueuedJob(object):
 
     """
     obj.ops_iter = None
+    obj.cur_opctx = None
 
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
@@ -745,8 +746,40 @@ def _EncodeOpError(err):
   return errors.EncodeException(to_encode)
 
 
+class _TimeoutStrategyWrapper:
+  def __init__(self, fn):
+    """Initializes this class.
+
+    """
+    self._fn = fn
+    self._next = None
+
+  def _Advance(self):
+    """Gets the next timeout if necessary.
+
+    """
+    if self._next is None:
+      self._next = self._fn()
+
+  def Peek(self):
+    """Returns the next timeout.
+
+    """
+    self._Advance()
+    return self._next
+
+  def Next(self):
+    """Returns the current timeout and advances the internal state.
+
+    """
+    self._Advance()
+    result = self._next
+    self._next = None
+    return result
+
+
 class _OpExecContext:
-  def __init__(self, op, index, log_prefix):
+  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
     """Initializes this class.
 
     """
@@ -755,22 +788,60 @@ class _OpExecContext:
     self.log_prefix = log_prefix
     self.summary = op.input.Summary()
 
+    self._timeout_strategy_factory = timeout_strategy_factory
+    self._ResetTimeoutStrategy()
+
+  def _ResetTimeoutStrategy(self):
+    """Creates a new timeout strategy.
+
+    """
+    self._timeout_strategy = \
+      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
+
+  def CheckPriorityIncrease(self):
+    """Checks whether priority can and should be increased.
+
+    Called when locks couldn't be acquired.
+
+    """
+    op = self.op
+
+    # Exhausted all retries and next round should not use blocking acquire
+    # for locks?
+    if (self._timeout_strategy.Peek() is None and
+        op.priority > constants.OP_PRIO_HIGHEST):
+      logging.debug("Increasing priority")
+      op.priority -= 1
+      self._ResetTimeoutStrategy()
+      return True
+
+    return False
+
+  def GetNextLockTimeout(self):
+    """Returns the next lock acquire timeout.
+
+    """
+    return self._timeout_strategy.Next()
+
 
 class _JobProcessor(object):
-  def __init__(self, queue, opexec_fn, job):
+  def __init__(self, queue, opexec_fn, job,
+               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
     """Initializes this class.
 
     """
     self.queue = queue
     self.opexec_fn = opexec_fn
     self.job = job
+    self._timeout_strategy_factory = _timeout_strategy_factory
 
   @staticmethod
-  def _FindNextOpcode(job):
+  def _FindNextOpcode(job, timeout_strategy_factory):
     """Locates the next opcode to run.
 
     @type job: L{_QueuedJob}
     @param job: Job object
+    @param timeout_strategy_factory: Callable to create new timeout strategy
 
     """
     # Create some sort of a cache to speed up locating next opcode for future
@@ -791,7 +862,8 @@ class _JobProcessor(object):
         # Found an opcode already marked as running
         raise errors.ProgrammerError("Called for job marked as running")
 
-      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)))
+      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
+                             timeout_strategy_factory)
 
       if op.status == constants.OP_STATUS_CANCELED:
         # Cancelled jobs are handled by the caller
@@ -838,10 +910,18 @@ class _JobProcessor(object):
 
     assert op.status == constants.OP_STATUS_WAITLOCK
 
+    timeout = opctx.GetNextLockTimeout()
+
     try:
       # Make sure not to hold queue lock while calling ExecOpCode
       result = self.opexec_fn(op.input,
-                              _OpExecCallbacks(self.queue, self.job, op))
+                              _OpExecCallbacks(self.queue, self.job, op),
+                              timeout=timeout)
+    except mcpu.LockAcquireTimeout:
+      assert timeout is not None, "Received timeout for blocking acquire"
+      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
+      assert op.status == constants.OP_STATUS_WAITLOCK
+      return (constants.OP_STATUS_QUEUED, None)
     except CancelJob:
       logging.exception("%s: Canceling job", opctx.log_prefix)
       assert op.status == constants.OP_STATUS_CANCELING
@@ -855,9 +935,10 @@ class _JobProcessor(object):
                     opctx.log_prefix, opctx.summary)
       return (constants.OP_STATUS_SUCCESS, result)
 
-  def __call__(self):
+  def __call__(self, _nextop_fn=None):
     """Continues execution of a job.
 
+    @param _nextop_fn: Callback function for tests
     @rtype: bool
     @return: True if job is finished, False if processor needs to be called
              again
@@ -872,7 +953,14 @@ class _JobProcessor(object):
     try:
       opcount = len(job.ops)
 
-      opctx = self._FindNextOpcode(job)
+      # Is a previous opcode still pending?
+      if job.cur_opctx:
+        opctx = job.cur_opctx
+      else:
+        if __debug__ and _nextop_fn:
+          _nextop_fn()
+        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
+
       op = opctx.op
 
       # Consistency check
@@ -884,6 +972,9 @@ class _JobProcessor(object):
                            constants.OP_STATUS_WAITLOCK,
                            constants.OP_STATUS_CANCELED)
 
+      assert (op.priority <= constants.OP_PRIO_LOWEST and
+              op.priority >= constants.OP_PRIO_HIGHEST)
+
       if op.status != constants.OP_STATUS_CANCELED:
         # Prepare to start opcode
         self._MarkWaitlock(job, op)
@@ -903,62 +994,87 @@ class _JobProcessor(object):
         finally:
           queue.acquire(shared=1)
 
-        # Finalize opcode
-        op.end_timestamp = TimeStampNow()
         op.status = op_status
         op.result = op_result
 
-        if op.status == constants.OP_STATUS_CANCELING:
-          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
-                                for i in job.ops[opctx.index:])
+        if op.status == constants.OP_STATUS_QUEUED:
+          # Couldn't get locks in time
+          assert not op.end_timestamp
         else:
-          assert op.status in constants.OPS_FINALIZED
+          # Finalize opcode
+          op.end_timestamp = TimeStampNow()
 
-      # Ensure all opcodes so far have been successful
-      assert (opctx.index == 0 or
-              compat.all(i.status == constants.OP_STATUS_SUCCESS
-                         for i in job.ops[:opctx.index]))
+          if op.status == constants.OP_STATUS_CANCELING:
+            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
+                                  for i in job.ops[opctx.index:])
+          else:
+            assert op.status in constants.OPS_FINALIZED
 
-      if op.status == constants.OP_STATUS_SUCCESS:
+      if op.status == constants.OP_STATUS_QUEUED:
         finalize = False
 
-      elif op.status == constants.OP_STATUS_ERROR:
-        # Ensure failed opcode has an exception as its result
-        assert errors.GetEncodedError(job.ops[opctx.index].result)
+        opctx.CheckPriorityIncrease()
 
-        to_encode = errors.OpExecError("Preceding opcode failed")
-        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                              _EncodeOpError(to_encode))
-        finalize = True
+        # Keep around for another round
+        job.cur_opctx = opctx
 
-        # Consistency check
-        assert compat.all(i.status == constants.OP_STATUS_ERROR and
-                          errors.GetEncodedError(i.result)
-                          for i in job.ops[opctx.index:])
+        assert (op.priority <= constants.OP_PRIO_LOWEST and
+                op.priority >= constants.OP_PRIO_HIGHEST)
 
-      elif op.status == constants.OP_STATUS_CANCELING:
-        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
-                              "Job canceled by request")
-        finalize = True
+        # In no case must the status be finalized here
+        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
 
-      elif op.status == constants.OP_STATUS_CANCELED:
-        finalize = True
+        queue.UpdateJobUnlocked(job)
 
       else:
-        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
+        # Ensure all opcodes so far have been successful
+        assert (opctx.index == 0 or
+                compat.all(i.status == constants.OP_STATUS_SUCCESS
+                           for i in job.ops[:opctx.index]))
+
+        # Reset context
+        job.cur_opctx = None
+
+        if op.status == constants.OP_STATUS_SUCCESS:
+          finalize = False
+
+        elif op.status == constants.OP_STATUS_ERROR:
+          # Ensure failed opcode has an exception as its result
+          assert errors.GetEncodedError(job.ops[opctx.index].result)
+
+          to_encode = errors.OpExecError("Preceding opcode failed")
+          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
+                                _EncodeOpError(to_encode))
+          finalize = True
 
-      # Finalizing or last opcode?
-      if finalize or opctx.index == (opcount - 1):
-        # All opcodes have been run, finalize job
-        job.end_timestamp = TimeStampNow()
+          # Consistency check
+          assert compat.all(i.status == constants.OP_STATUS_ERROR and
+                            errors.GetEncodedError(i.result)
+                            for i in job.ops[opctx.index:])
 
-      # Write to disk. If the job status is final, this is the final write
-      # allowed. Once the file has been written, it can be archived anytime.
-      queue.UpdateJobUnlocked(job)
+        elif op.status == constants.OP_STATUS_CANCELING:
+          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
+                                "Job canceled by request")
+          finalize = True
+
+        elif op.status == constants.OP_STATUS_CANCELED:
+          finalize = True
+
+        else:
+          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
+
+        # Finalizing or last opcode?
+        if finalize or opctx.index == (opcount - 1):
+          # All opcodes have been run, finalize job
+          job.end_timestamp = TimeStampNow()
+
+        # Write to disk. If the job status is final, this is the final write
+        # allowed. Once the file has been written, it can be archived anytime.
+        queue.UpdateJobUnlocked(job)
 
-      if finalize or opctx.index == (opcount - 1):
-        logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
-        return True
+        if finalize or opctx.index == (opcount - 1):
+          logging.info("Finished job %s, status = %s", job.id, 
job.CalcStatus())
+          return True
 
       return False
     finally:
@@ -988,7 +1104,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
 
     if not _JobProcessor(queue, proc.ExecOpCode, job)():
       # Schedule again
-      raise workerpool.DeferTask()
+      raise workerpool.DeferTask(priority=job.CalcPriority())
 
 
 class _JobQueueWorkerPool(workerpool.WorkerPool):
diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py
index d573378..ba56ae5 100755
--- a/test/ganeti.jqueue_unittest.py
+++ b/test/ganeti.jqueue_unittest.py
@@ -27,6 +27,7 @@ import unittest
 import tempfile
 import shutil
 import errno
+import itertools
 
 from ganeti import constants
 from ganeti import utils
@@ -34,6 +35,7 @@ from ganeti import errors
 from ganeti import jqueue
 from ganeti import opcodes
 from ganeti import compat
+from ganeti import mcpu
 
 import testutils
 
@@ -447,11 +449,11 @@ class _FakeExecOpCodeForProc:
     self._before_start = before_start
     self._after_start = after_start
 
-  def __call__(self, op, cbs):
+  def __call__(self, op, cbs, timeout=None):
     assert isinstance(op, opcodes.OpTestDummy)
 
     if self._before_start:
-      self._before_start()
+      self._before_start(timeout)
 
     cbs.NotifyStart()
 
@@ -464,7 +466,7 @@ class _FakeExecOpCodeForProc:
     return op.result
 
 
-class TestJobProcessor(unittest.TestCase):
+class _JobProcessorTestUtils:
   def _CreateJob(self, queue, job_id, ops):
     job = jqueue._QueuedJob(queue, job_id, ops)
     self.assertFalse(job.start_timestamp)
@@ -475,6 +477,8 @@ class TestJobProcessor(unittest.TestCase):
     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]])
     return job
 
+
+class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
   def _GenericCheckJob(self, job):
     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
                       for op in job.ops)
@@ -502,7 +506,7 @@ class TestJobProcessor(unittest.TestCase):
       # Create job
       job = self._CreateJob(queue, job_id, ops)
 
-      def _BeforeStart():
+      def _BeforeStart(_):
         self.assertFalse(queue.IsAcquired())
         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -656,7 +660,7 @@ class TestJobProcessor(unittest.TestCase):
 
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
-    def _BeforeStart():
+    def _BeforeStart(_):
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -845,7 +849,7 @@ class TestJobProcessor(unittest.TestCase):
     # Create job
     job = self._CreateJob(queue, 29386, ops)
 
-    def _BeforeStart():
+    def _BeforeStart(_):
       self.assertFalse(queue.IsAcquired())
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
 
@@ -922,5 +926,190 @@ class TestJobProcessor(unittest.TestCase):
     self.assertFalse(job.GetLogEntries(count + 3))
 
 
+class _FakeTimeoutStrategy:
+  def __init__(self, timeouts):
+    self.timeouts = timeouts
+    self.attempts = 0
+    self.last_timeout = None
+
+  def NextAttempt(self):
+    self.attempts += 1
+    if self.timeouts:
+      timeout = self.timeouts.pop(0)
+    else:
+      timeout = None
+    self.last_timeout = timeout
+    return timeout
+
+
+class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
+  def setUp(self):
+    self.queue = _FakeQueueForProc()
+    self.job = None
+    self.curop = None
+    self.opcounter = None
+    self.timeout_strategy = None
+    self.retries = 0
+    self.prev_tsop = None
+    self.prev_prio = None
+    self.gave_lock = None
+    self.done_lock_before_blocking = False
+
+  def _BeforeStart(self, timeout):
+    job = self.job
+
+    self.assertFalse(self.queue.IsAcquired())
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
+
+    ts = self.timeout_strategy
+
+    self.assert_(timeout is None or isinstance(timeout, (int, float)))
+    self.assertEqual(timeout, ts.last_timeout)
+
+    self.gave_lock = True
+
+    if (self.curop == 3 and
+        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
+      # Give locks before running into blocking acquire
+      assert self.retries == 7
+      self.retries = 0
+      self.done_lock_before_blocking = True
+      return
+
+    if self.retries > 0:
+      self.assert_(timeout is not None)
+      self.retries -= 1
+      self.gave_lock = False
+      raise mcpu.LockAcquireTimeout()
+
+    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
+      assert self.retries == 0, "Didn't exhaust all retries at highest 
priority"
+      assert not ts.timeouts
+      self.assert_(timeout is None)
+
+  def _AfterStart(self, op, cbs):
+    job = self.job
+
+    self.assertFalse(self.queue.IsAcquired())
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
+
+    # Job is running, cancelling shouldn't be possible
+    (success, _) = job.Cancel()
+    self.assertFalse(success)
+
+  def _NextOpcode(self):
+    self.curop = self.opcounter.next()
+    self.prev_prio = self.job.ops[self.curop].priority
+
+  def _NewTimeoutStrategy(self):
+    job = self.job
+
+    self.assertEqual(self.retries, 0)
+
+    if self.prev_tsop == self.curop:
+      # Still on the same opcode, priority must've been increased
+      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
+
+    if self.curop == 1:
+      # Normal retry
+      timeouts = range(10, 31, 10)
+      self.retries = len(timeouts) - 1
+
+    elif self.curop == 2:
+      # Let this run into a blocking acquire
+      timeouts = range(11, 61, 12)
+      self.retries = len(timeouts)
+
+    elif self.curop == 3:
+      # Wait for priority to increase, but give lock before blocking acquire
+      timeouts = range(12, 100, 14)
+      self.retries = len(timeouts)
+
+      self.assertFalse(self.done_lock_before_blocking)
+
+    elif self.curop == 4:
+      self.assert_(self.done_lock_before_blocking)
+
+      # Timeouts, but no need to retry
+      timeouts = range(10, 31, 10)
+      self.retries = 0
+
+    elif self.curop == 5:
+      # Normal retry
+      timeouts = range(19, 100, 11)
+      self.retries = len(timeouts)
+
+    else:
+      timeouts = []
+      self.retries = 0
+
+    assert len(job.ops) == 10
+    assert self.retries <= len(timeouts)
+
+    ts = _FakeTimeoutStrategy(timeouts)
+
+    self.timeout_strategy = ts
+    self.prev_tsop = self.curop
+    self.prev_prio = job.ops[self.curop].priority
+
+    return ts
+
+  def testTimeout(self):
+    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
+           for i in range(10)]
+
+    # Create job
+    job_id = 15801
+    job = self._CreateJob(self.queue, job_id, ops)
+    self.job = job
+
+    self.opcounter = itertools.count(0)
+
+    opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
+    tsf = self._NewTimeoutStrategy
+
+    self.assertFalse(self.done_lock_before_blocking)
+
+    for i in itertools.count(0):
+      proc = jqueue._JobProcessor(self.queue, opexec, job,
+                                  _timeout_strategy_factory=tsf)
+
+      result = proc(_nextop_fn=self._NextOpcode)
+      if result:
+        self.assertFalse(job.cur_opctx)
+        break
+
+      self.assertFalse(result)
+
+      if self.gave_lock:
+        self.assertFalse(job.cur_opctx)
+      else:
+        self.assert_(job.cur_opctx)
+        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
+                         self.timeout_strategy.NextAttempt)
+
+      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
+      self.assert_(job.start_timestamp)
+      self.assertFalse(job.end_timestamp)
+
+    self.assertEqual(self.curop, len(job.ops) - 1)
+    self.assertEqual(self.job, job)
+    self.assertEqual(self.opcounter.next(), len(job.ops))
+    self.assert_(self.done_lock_before_blocking)
+
+    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
+    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
+    self.assertEqual(job.GetInfo(["opresult"]),
+                     [[op.input.result for op in job.ops]])
+    self.assertEqual(job.GetInfo(["opstatus"]),
+                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
+    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
+                            for op in job.ops))
+
+    # Finished jobs can't be processed any further
+    self.assertRaises(errors.ProgrammerError,
+                      jqueue._JobProcessor(self.queue, opexec, job))
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()
-- 
1.7.0.4

Reply via email to