On Thu, Sep 23, 2010 at 3:05 PM, Michael Hanselmann <[email protected]> wrote:
> As already noted in the design document, an opcode's priority is
> increased when the lock(s) can't be acquired within a certain amount of
> time, except at the highest priority, where in such a case a blocking
> acquire is used.
>
> A unittest is provided. Priorities are not yet used for acquiring the
> lock(s)—this will need further changes on mcpu.
>
> Signed-off-by: Michael Hanselmann <[email protected]>
> ---
>  daemons/ganeti-masterd         |    3 +
>  lib/jqueue.py                  |  212 
> +++++++++++++++++++++++++++++++---------
>  test/ganeti.jqueue_unittest.py |  201 ++++++++++++++++++++++++++++++++++++-
>  3 files changed, 362 insertions(+), 54 deletions(-)
>
> diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd
> index d627c47..8c32504 100755
> --- a/daemons/ganeti-masterd
> +++ b/daemons/ganeti-masterd
> @@ -314,6 +314,9 @@ class ClientOps:
>     """
>     # Queries don't have a job id
>     proc = mcpu.Processor(self.server.context, None)
> +
> +    # TODO: Executing an opcode using locks will acquire them in blocking 
> mode.
> +    # Consider using a timeout for retries.
>     return proc.ExecOpCode(op, None)
>
>
> diff --git a/lib/jqueue.py b/lib/jqueue.py
> index c7fd4c7..7411f46 100644
> --- a/lib/jqueue.py
> +++ b/lib/jqueue.py
> @@ -176,7 +176,7 @@ class _QueuedJob(object):
>
>   """
>   # pylint: disable-msg=W0212
> -  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter",
> +  __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx",
>                "received_timestamp", "start_timestamp", "end_timestamp",
>                "__weakref__"]
>
> @@ -211,6 +211,7 @@ class _QueuedJob(object):
>
>     """
>     obj.ops_iter = None
> +    obj.cur_opctx = None
>
>   def __repr__(self):
>     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
> @@ -745,8 +746,40 @@ def _EncodeOpError(err):
>   return errors.EncodeException(to_encode)
>
>
> +class _TimeoutStrategyWrapper:
> +  def __init__(self, fn):
> +    """Initializes this class.
> +
> +    """
> +    self._fn = fn
> +    self._next = None
> +
> +  def _Advance(self):
> +    """Gets the next timeout if necessary.
> +
> +    """
> +    if self._next is None:
> +      self._next = self._fn()
> +
> +  def Peek(self):
> +    """Returns the next timeout.
> +
> +    """
> +    self._Advance()
> +    return self._next
> +
> +  def Next(self):
> +    """Returns the current timeout and advances the internal state.
> +
> +    """
> +    self._Advance()
> +    result = self._next
> +    self._next = None
> +    return result
> +
> +
>  class _OpExecContext:
> -  def __init__(self, op, index, log_prefix):
> +  def __init__(self, op, index, log_prefix, timeout_strategy_factory):
>     """Initializes this class.
>
>     """
> @@ -755,22 +788,60 @@ class _OpExecContext:
>     self.log_prefix = log_prefix
>     self.summary = op.input.Summary()
>
> +    self._timeout_strategy_factory = timeout_strategy_factory
> +    self._ResetTimeoutStrategy()
> +
> +  def _ResetTimeoutStrategy(self):
> +    """Creates a new timeout strategy.
> +
> +    """
> +    self._timeout_strategy = \
> +      _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt)
> +
> +  def CheckPriorityIncrease(self):
> +    """Checks whether priority can and should be increased.
> +
> +    Called when locks couldn't be acquired.
> +
> +    """
> +    op = self.op
> +
> +    # Exhausted all retries and next round should not use blocking acquire
> +    # for locks?
> +    if (self._timeout_strategy.Peek() is None and
> +        op.priority > constants.OP_PRIO_HIGHEST):
> +      logging.debug("Increasing priority")
> +      op.priority -= 1
> +      self._ResetTimeoutStrategy()
> +      return True
> +
> +    return False
> +
> +  def GetNextLockTimeout(self):
> +    """Returns the next lock acquire timeout.
> +
> +    """
> +    return self._timeout_strategy.Next()
> +
>
>  class _JobProcessor(object):
> -  def __init__(self, queue, opexec_fn, job):
> +  def __init__(self, queue, opexec_fn, job,
> +               _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy):
>     """Initializes this class.
>
>     """
>     self.queue = queue
>     self.opexec_fn = opexec_fn
>     self.job = job
> +    self._timeout_strategy_factory = _timeout_strategy_factory
>
>   @staticmethod
> -  def _FindNextOpcode(job):
> +  def _FindNextOpcode(job, timeout_strategy_factory):
>     """Locates the next opcode to run.
>
>     @type job: L{_QueuedJob}
>     @param job: Job object
> +   �...@param timeout_strategy_factory: Callable to create new timeout 
> strategy
>
>     """
>     # Create some sort of a cache to speed up locating next opcode for future
> @@ -791,7 +862,8 @@ class _JobProcessor(object):
>         # Found an opcode already marked as running
>         raise errors.ProgrammerError("Called for job marked as running")
>
> -      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)))
> +      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
> +                             timeout_strategy_factory)
>
>       if op.status == constants.OP_STATUS_CANCELED:
>         # Cancelled jobs are handled by the caller
> @@ -838,10 +910,18 @@ class _JobProcessor(object):
>
>     assert op.status == constants.OP_STATUS_WAITLOCK
>
> +    timeout = opctx.GetNextLockTimeout()
> +
>     try:
>       # Make sure not to hold queue lock while calling ExecOpCode
>       result = self.opexec_fn(op.input,
> -                              _OpExecCallbacks(self.queue, self.job, op))
> +                              _OpExecCallbacks(self.queue, self.job, op),
> +                              timeout=timeout)
> +    except mcpu.LockAcquireTimeout:
> +      assert timeout is not None, "Received timeout for blocking acquire"
> +      logging.debug("Couldn't acquire locks in %0.6fs", timeout)
> +      assert op.status == constants.OP_STATUS_WAITLOCK
> +      return (constants.OP_STATUS_QUEUED, None)
>     except CancelJob:
>       logging.exception("%s: Canceling job", opctx.log_prefix)
>       assert op.status == constants.OP_STATUS_CANCELING
> @@ -855,9 +935,10 @@ class _JobProcessor(object):
>                     opctx.log_prefix, opctx.summary)
>       return (constants.OP_STATUS_SUCCESS, result)
>
> -  def __call__(self):
> +  def __call__(self, _nextop_fn=None):
>     """Continues execution of a job.
>
> +   �...@param _nextop_fn: Callback function for tests
>     @rtype: bool
>     @return: True if job is finished, False if processor needs to be called
>              again
> @@ -872,7 +953,14 @@ class _JobProcessor(object):
>     try:
>       opcount = len(job.ops)
>
> -      opctx = self._FindNextOpcode(job)
> +      # Is a previous opcode still pending?
> +      if job.cur_opctx:
> +        opctx = job.cur_opctx
> +      else:
> +        if __debug__ and _nextop_fn:
> +          _nextop_fn()
> +        opctx = self._FindNextOpcode(job, self._timeout_strategy_factory)
> +
>       op = opctx.op
>
>       # Consistency check
> @@ -884,6 +972,9 @@ class _JobProcessor(object):
>                            constants.OP_STATUS_WAITLOCK,
>                            constants.OP_STATUS_CANCELED)
>
> +      assert (op.priority <= constants.OP_PRIO_LOWEST and
> +              op.priority >= constants.OP_PRIO_HIGHEST)
> +
>       if op.status != constants.OP_STATUS_CANCELED:
>         # Prepare to start opcode
>         self._MarkWaitlock(job, op)
> @@ -903,62 +994,87 @@ class _JobProcessor(object):
>         finally:
>           queue.acquire(shared=1)
>
> -        # Finalize opcode
> -        op.end_timestamp = TimeStampNow()
>         op.status = op_status
>         op.result = op_result
>
> -        if op.status == constants.OP_STATUS_CANCELING:
> -          assert not compat.any(i.status != constants.OP_STATUS_CANCELING
> -                                for i in job.ops[opctx.index:])
> +        if op.status == constants.OP_STATUS_QUEUED:
> +          # Couldn't get locks in time
> +          assert not op.end_timestamp
>         else:
> -          assert op.status in constants.OPS_FINALIZED
> +          # Finalize opcode
> +          op.end_timestamp = TimeStampNow()
>
> -      # Ensure all opcodes so far have been successful
> -      assert (opctx.index == 0 or
> -              compat.all(i.status == constants.OP_STATUS_SUCCESS
> -                         for i in job.ops[:opctx.index]))
> +          if op.status == constants.OP_STATUS_CANCELING:
> +            assert not compat.any(i.status != constants.OP_STATUS_CANCELING
> +                                  for i in job.ops[opctx.index:])
> +          else:
> +            assert op.status in constants.OPS_FINALIZED
>
> -      if op.status == constants.OP_STATUS_SUCCESS:
> +      if op.status == constants.OP_STATUS_QUEUED:
>         finalize = False
>
> -      elif op.status == constants.OP_STATUS_ERROR:
> -        # Ensure failed opcode has an exception as its result
> -        assert errors.GetEncodedError(job.ops[opctx.index].result)
> +        opctx.CheckPriorityIncrease()
>
> -        to_encode = errors.OpExecError("Preceding opcode failed")
> -        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
> -                              _EncodeOpError(to_encode))
> -        finalize = True
> +        # Keep around for another round
> +        job.cur_opctx = opctx
>
> -        # Consistency check
> -        assert compat.all(i.status == constants.OP_STATUS_ERROR and
> -                          errors.GetEncodedError(i.result)
> -                          for i in job.ops[opctx.index:])
> +        assert (op.priority <= constants.OP_PRIO_LOWEST and
> +                op.priority >= constants.OP_PRIO_HIGHEST)
>
> -      elif op.status == constants.OP_STATUS_CANCELING:
> -        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
> -                              "Job canceled by request")
> -        finalize = True
> +        # In no case must the status be finalized here
> +        assert job.CalcStatus() == constants.JOB_STATUS_QUEUED
>
> -      elif op.status == constants.OP_STATUS_CANCELED:
> -        finalize = True
> +        queue.UpdateJobUnlocked(job)
>
>       else:
> -        raise errors.ProgrammerError("Unknown status '%s'" % op.status)
> +        # Ensure all opcodes so far have been successful
> +        assert (opctx.index == 0 or
> +                compat.all(i.status == constants.OP_STATUS_SUCCESS
> +                           for i in job.ops[:opctx.index]))
> +
> +        # Reset context
> +        job.cur_opctx = None
> +
> +        if op.status == constants.OP_STATUS_SUCCESS:
> +          finalize = False
> +
> +        elif op.status == constants.OP_STATUS_ERROR:
> +          # Ensure failed opcode has an exception as its result
> +          assert errors.GetEncodedError(job.ops[opctx.index].result)
> +
> +          to_encode = errors.OpExecError("Preceding opcode failed")
> +          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
> +                                _EncodeOpError(to_encode))
> +          finalize = True
>
> -      # Finalizing or last opcode?
> -      if finalize or opctx.index == (opcount - 1):
> -        # All opcodes have been run, finalize job
> -        job.end_timestamp = TimeStampNow()
> +          # Consistency check
> +          assert compat.all(i.status == constants.OP_STATUS_ERROR and
> +                            errors.GetEncodedError(i.result)
> +                            for i in job.ops[opctx.index:])
>
> -      # Write to disk. If the job status is final, this is the final write
> -      # allowed. Once the file has been written, it can be archived anytime.
> -      queue.UpdateJobUnlocked(job)
> +        elif op.status == constants.OP_STATUS_CANCELING:
> +          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
> +                                "Job canceled by request")
> +          finalize = True
> +
> +        elif op.status == constants.OP_STATUS_CANCELED:
> +          finalize = True
> +
> +        else:
> +          raise errors.ProgrammerError("Unknown status '%s'" % op.status)
> +
> +        # Finalizing or last opcode?
> +        if finalize or opctx.index == (opcount - 1):
> +          # All opcodes have been run, finalize job
> +          job.end_timestamp = TimeStampNow()
> +
> +        # Write to disk. If the job status is final, this is the final write
> +        # allowed. Once the file has been written, it can be archived 
> anytime.
> +        queue.UpdateJobUnlocked(job)
>
> -      if finalize or opctx.index == (opcount - 1):
> -        logging.info("Finished job %s, status = %s", job.id, 
> job.CalcStatus())
> -        return True
> +        if finalize or opctx.index == (opcount - 1):
> +          logging.info("Finished job %s, status = %s", job.id, 
> job.CalcStatus())
> +          return True
>
>       return False
>     finally:
> @@ -988,7 +1104,7 @@ class _JobQueueWorker(workerpool.BaseWorker):
>
>     if not _JobProcessor(queue, proc.ExecOpCode, job)():
>       # Schedule again
> -      raise workerpool.DeferTask()
> +      raise workerpool.DeferTask(priority=job.CalcPriority())
>
>
>  class _JobQueueWorkerPool(workerpool.WorkerPool):
> diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py
> index d573378..ba56ae5 100755
> --- a/test/ganeti.jqueue_unittest.py
> +++ b/test/ganeti.jqueue_unittest.py
> @@ -27,6 +27,7 @@ import unittest
>  import tempfile
>  import shutil
>  import errno
> +import itertools
>
>  from ganeti import constants
>  from ganeti import utils
> @@ -34,6 +35,7 @@ from ganeti import errors
>  from ganeti import jqueue
>  from ganeti import opcodes
>  from ganeti import compat
> +from ganeti import mcpu
>
>  import testutils
>
> @@ -447,11 +449,11 @@ class _FakeExecOpCodeForProc:
>     self._before_start = before_start
>     self._after_start = after_start
>
> -  def __call__(self, op, cbs):
> +  def __call__(self, op, cbs, timeout=None):
>     assert isinstance(op, opcodes.OpTestDummy)
>
>     if self._before_start:
> -      self._before_start()
> +      self._before_start(timeout)
>
>     cbs.NotifyStart()
>
> @@ -464,7 +466,7 @@ class _FakeExecOpCodeForProc:
>     return op.result
>
>
> -class TestJobProcessor(unittest.TestCase):
> +class _JobProcessorTestUtils:
>   def _CreateJob(self, queue, job_id, ops):
>     job = jqueue._QueuedJob(queue, job_id, ops)
>     self.assertFalse(job.start_timestamp)
> @@ -475,6 +477,8 @@ class TestJobProcessor(unittest.TestCase):
>     self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in 
> ops]])
>     return job
>
> +
> +class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils):
>   def _GenericCheckJob(self, job):
>     assert compat.all(isinstance(op.input, opcodes.OpTestDummy)
>                       for op in job.ops)
> @@ -502,7 +506,7 @@ class TestJobProcessor(unittest.TestCase):
>       # Create job
>       job = self._CreateJob(queue, job_id, ops)
>
> -      def _BeforeStart():
> +      def _BeforeStart(_):
>         self.assertFalse(queue.IsAcquired())
>         self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
>
> @@ -656,7 +660,7 @@ class TestJobProcessor(unittest.TestCase):
>
>     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
>
> -    def _BeforeStart():
> +    def _BeforeStart(_):
>       self.assertFalse(queue.IsAcquired())
>       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
>
> @@ -845,7 +849,7 @@ class TestJobProcessor(unittest.TestCase):
>     # Create job
>     job = self._CreateJob(queue, 29386, ops)
>
> -    def _BeforeStart():
> +    def _BeforeStart(_):
>       self.assertFalse(queue.IsAcquired())
>       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
>
> @@ -922,5 +926,190 @@ class TestJobProcessor(unittest.TestCase):
>     self.assertFalse(job.GetLogEntries(count + 3))
>
>
> +class _FakeTimeoutStrategy:
> +  def __init__(self, timeouts):
> +    self.timeouts = timeouts
> +    self.attempts = 0
> +    self.last_timeout = None
> +
> +  def NextAttempt(self):
> +    self.attempts += 1
> +    if self.timeouts:
> +      timeout = self.timeouts.pop(0)
> +    else:
> +      timeout = None
> +    self.last_timeout = timeout
> +    return timeout
> +
> +
> +class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils):
> +  def setUp(self):
> +    self.queue = _FakeQueueForProc()
> +    self.job = None
> +    self.curop = None
> +    self.opcounter = None
> +    self.timeout_strategy = None
> +    self.retries = 0
> +    self.prev_tsop = None
> +    self.prev_prio = None
> +    self.gave_lock = None
> +    self.done_lock_before_blocking = False
> +
> +  def _BeforeStart(self, timeout):
> +    job = self.job
> +
> +    self.assertFalse(self.queue.IsAcquired())
> +    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK)
> +
> +    ts = self.timeout_strategy
> +
> +    self.assert_(timeout is None or isinstance(timeout, (int, float)))
> +    self.assertEqual(timeout, ts.last_timeout)
> +
> +    self.gave_lock = True
> +
> +    if (self.curop == 3 and
> +        job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3):
> +      # Give locks before running into blocking acquire
> +      assert self.retries == 7
> +      self.retries = 0
> +      self.done_lock_before_blocking = True
> +      return
> +
> +    if self.retries > 0:
> +      self.assert_(timeout is not None)
> +      self.retries -= 1
> +      self.gave_lock = False
> +      raise mcpu.LockAcquireTimeout()
> +
> +    if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST:
> +      assert self.retries == 0, "Didn't exhaust all retries at highest 
> priority"
> +      assert not ts.timeouts
> +      self.assert_(timeout is None)
> +
> +  def _AfterStart(self, op, cbs):
> +    job = self.job
> +
> +    self.assertFalse(self.queue.IsAcquired())
> +    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
> +
> +    # Job is running, cancelling shouldn't be possible
> +    (success, _) = job.Cancel()
> +    self.assertFalse(success)
> +
> +  def _NextOpcode(self):
> +    self.curop = self.opcounter.next()
> +    self.prev_prio = self.job.ops[self.curop].priority
> +
> +  def _NewTimeoutStrategy(self):
> +    job = self.job
> +
> +    self.assertEqual(self.retries, 0)
> +
> +    if self.prev_tsop == self.curop:
> +      # Still on the same opcode, priority must've been increased
> +      self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1)
> +
> +    if self.curop == 1:
> +      # Normal retry
> +      timeouts = range(10, 31, 10)
> +      self.retries = len(timeouts) - 1
> +
> +    elif self.curop == 2:
> +      # Let this run into a blocking acquire
> +      timeouts = range(11, 61, 12)
> +      self.retries = len(timeouts)
> +
> +    elif self.curop == 3:
> +      # Wait for priority to increase, but give lock before blocking acquire
> +      timeouts = range(12, 100, 14)
> +      self.retries = len(timeouts)
> +
> +      self.assertFalse(self.done_lock_before_blocking)
> +
> +    elif self.curop == 4:
> +      self.assert_(self.done_lock_before_blocking)
> +
> +      # Timeouts, but no need to retry
> +      timeouts = range(10, 31, 10)
> +      self.retries = 0
> +
> +    elif self.curop == 5:
> +      # Normal retry
> +      timeouts = range(19, 100, 11)
> +      self.retries = len(timeouts)
> +
> +    else:
> +      timeouts = []
> +      self.retries = 0
> +
> +    assert len(job.ops) == 10
> +    assert self.retries <= len(timeouts)
> +
> +    ts = _FakeTimeoutStrategy(timeouts)
> +
> +    self.timeout_strategy = ts
> +    self.prev_tsop = self.curop
> +    self.prev_prio = job.ops[self.curop].priority
> +
> +    return ts
> +
> +  def testTimeout(self):
> +    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
> +           for i in range(10)]
> +
> +    # Create job
> +    job_id = 15801
> +    job = self._CreateJob(self.queue, job_id, ops)
> +    self.job = job
> +
> +    self.opcounter = itertools.count(0)
> +
> +    opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart)
> +    tsf = self._NewTimeoutStrategy
> +
> +    self.assertFalse(self.done_lock_before_blocking)
> +
> +    for i in itertools.count(0):
> +      proc = jqueue._JobProcessor(self.queue, opexec, job,
> +                                  _timeout_strategy_factory=tsf)
> +
> +      result = proc(_nextop_fn=self._NextOpcode)
> +      if result:
> +        self.assertFalse(job.cur_opctx)
> +        break
> +
> +      self.assertFalse(result)
> +
> +      if self.gave_lock:
> +        self.assertFalse(job.cur_opctx)
> +      else:
> +        self.assert_(job.cur_opctx)
> +        self.assertEqual(job.cur_opctx._timeout_strategy._fn,
> +                         self.timeout_strategy.NextAttempt)
> +
> +      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
> +      self.assert_(job.start_timestamp)
> +      self.assertFalse(job.end_timestamp)
> +
> +    self.assertEqual(self.curop, len(job.ops) - 1)
> +    self.assertEqual(self.job, job)
> +    self.assertEqual(self.opcounter.next(), len(job.ops))
> +    self.assert_(self.done_lock_before_blocking)
> +
> +    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
> +    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
> +    self.assertEqual(job.GetInfo(["opresult"]),
> +                     [[op.input.result for op in job.ops]])
> +    self.assertEqual(job.GetInfo(["opstatus"]),
> +                     [len(job.ops) * [constants.OP_STATUS_SUCCESS]])
> +    self.assert_(compat.all(op.start_timestamp and op.end_timestamp
> +                            for op in job.ops))
> +
> +    # Finished jobs can't be processed any further
> +    self.assertRaises(errors.ProgrammerError,
> +                      jqueue._JobProcessor(self.queue, opexec, job))
> +
> +
>  if __name__ == "__main__":
>   testutils.GanetiTestProgram()
> --
> 1.7.0.4

LGTM

>
>

Reply via email to