As already noted in the design document, an opcode's priority is increased when the lock(s) can't be acquired within a certain amount of time, except at the highest priority, where in such a case a blocking acquire is used.
A unittest is provided. Priorities are not yet used for acquiring the lock(s)—this will need further changes on mcpu. Signed-off-by: Michael Hanselmann <[email protected]> --- daemons/ganeti-masterd | 3 + lib/jqueue.py | 212 +++++++++++++++++++++++++++++++--------- test/ganeti.jqueue_unittest.py | 201 ++++++++++++++++++++++++++++++++++++- 3 files changed, 362 insertions(+), 54 deletions(-) diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd index d627c47..8c32504 100755 --- a/daemons/ganeti-masterd +++ b/daemons/ganeti-masterd @@ -314,6 +314,9 @@ class ClientOps: """ # Queries don't have a job id proc = mcpu.Processor(self.server.context, None) + + # TODO: Executing an opcode using locks will acquire them in blocking mode. + # Consider using a timeout for retries. return proc.ExecOpCode(op, None) diff --git a/lib/jqueue.py b/lib/jqueue.py index c7fd4c7..7411f46 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -176,7 +176,7 @@ class _QueuedJob(object): """ # pylint: disable-msg=W0212 - __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", + __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", "received_timestamp", "start_timestamp", "end_timestamp", "__weakref__"] @@ -211,6 +211,7 @@ class _QueuedJob(object): """ obj.ops_iter = None + obj.cur_opctx = None def __repr__(self): status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), @@ -745,8 +746,40 @@ def _EncodeOpError(err): return errors.EncodeException(to_encode) +class _TimeoutStrategyWrapper: + def __init__(self, fn): + """Initializes this class. + + """ + self._fn = fn + self._next = None + + def _Advance(self): + """Gets the next timeout if necessary. + + """ + if self._next is None: + self._next = self._fn() + + def Peek(self): + """Returns the next timeout. + + """ + self._Advance() + return self._next + + def Next(self): + """Returns the current timeout and advances the internal state. + + """ + self._Advance() + result = self._next + self._next = None + return result + + class _OpExecContext: - def __init__(self, op, index, log_prefix): + def __init__(self, op, index, log_prefix, timeout_strategy_factory): """Initializes this class. """ @@ -755,22 +788,60 @@ class _OpExecContext: self.log_prefix = log_prefix self.summary = op.input.Summary() + self._timeout_strategy_factory = timeout_strategy_factory + self._ResetTimeoutStrategy() + + def _ResetTimeoutStrategy(self): + """Creates a new timeout strategy. + + """ + self._timeout_strategy = \ + _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt) + + def CheckPriorityIncrease(self): + """Checks whether priority can and should be increased. + + Called when locks couldn't be acquired. + + """ + op = self.op + + # Exhausted all retries and next round should not use blocking acquire + # for locks? + if (self._timeout_strategy.Peek() is None and + op.priority > constants.OP_PRIO_HIGHEST): + logging.debug("Increasing priority") + op.priority -= 1 + self._ResetTimeoutStrategy() + return True + + return False + + def GetNextLockTimeout(self): + """Returns the next lock acquire timeout. + + """ + return self._timeout_strategy.Next() + class _JobProcessor(object): - def __init__(self, queue, opexec_fn, job): + def __init__(self, queue, opexec_fn, job, + _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy): """Initializes this class. """ self.queue = queue self.opexec_fn = opexec_fn self.job = job + self._timeout_strategy_factory = _timeout_strategy_factory @staticmethod - def _FindNextOpcode(job): + def _FindNextOpcode(job, timeout_strategy_factory): """Locates the next opcode to run. @type job: L{_QueuedJob} @param job: Job object + @param timeout_strategy_factory: Callable to create new timeout strategy """ # Create some sort of a cache to speed up locating next opcode for future @@ -791,7 +862,8 @@ class _JobProcessor(object): # Found an opcode already marked as running raise errors.ProgrammerError("Called for job marked as running") - opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops))) + opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), + timeout_strategy_factory) if op.status == constants.OP_STATUS_CANCELED: # Cancelled jobs are handled by the caller @@ -838,10 +910,18 @@ class _JobProcessor(object): assert op.status == constants.OP_STATUS_WAITLOCK + timeout = opctx.GetNextLockTimeout() + try: # Make sure not to hold queue lock while calling ExecOpCode result = self.opexec_fn(op.input, - _OpExecCallbacks(self.queue, self.job, op)) + _OpExecCallbacks(self.queue, self.job, op), + timeout=timeout) + except mcpu.LockAcquireTimeout: + assert timeout is not None, "Received timeout for blocking acquire" + logging.debug("Couldn't acquire locks in %0.6fs", timeout) + assert op.status == constants.OP_STATUS_WAITLOCK + return (constants.OP_STATUS_QUEUED, None) except CancelJob: logging.exception("%s: Canceling job", opctx.log_prefix) assert op.status == constants.OP_STATUS_CANCELING @@ -855,9 +935,10 @@ class _JobProcessor(object): opctx.log_prefix, opctx.summary) return (constants.OP_STATUS_SUCCESS, result) - def __call__(self): + def __call__(self, _nextop_fn=None): """Continues execution of a job. + @param _nextop_fn: Callback function for tests @rtype: bool @return: True if job is finished, False if processor needs to be called again @@ -872,7 +953,14 @@ class _JobProcessor(object): try: opcount = len(job.ops) - opctx = self._FindNextOpcode(job) + # Is a previous opcode still pending? + if job.cur_opctx: + opctx = job.cur_opctx + else: + if __debug__ and _nextop_fn: + _nextop_fn() + opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) + op = opctx.op # Consistency check @@ -884,6 +972,9 @@ class _JobProcessor(object): constants.OP_STATUS_WAITLOCK, constants.OP_STATUS_CANCELED) + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) + if op.status != constants.OP_STATUS_CANCELED: # Prepare to start opcode self._MarkWaitlock(job, op) @@ -903,62 +994,87 @@ class _JobProcessor(object): finally: queue.acquire(shared=1) - # Finalize opcode - op.end_timestamp = TimeStampNow() op.status = op_status op.result = op_result - if op.status == constants.OP_STATUS_CANCELING: - assert not compat.any(i.status != constants.OP_STATUS_CANCELING - for i in job.ops[opctx.index:]) + if op.status == constants.OP_STATUS_QUEUED: + # Couldn't get locks in time + assert not op.end_timestamp else: - assert op.status in constants.OPS_FINALIZED + # Finalize opcode + op.end_timestamp = TimeStampNow() - # Ensure all opcodes so far have been successful - assert (opctx.index == 0 or - compat.all(i.status == constants.OP_STATUS_SUCCESS - for i in job.ops[:opctx.index])) + if op.status == constants.OP_STATUS_CANCELING: + assert not compat.any(i.status != constants.OP_STATUS_CANCELING + for i in job.ops[opctx.index:]) + else: + assert op.status in constants.OPS_FINALIZED - if op.status == constants.OP_STATUS_SUCCESS: + if op.status == constants.OP_STATUS_QUEUED: finalize = False - elif op.status == constants.OP_STATUS_ERROR: - # Ensure failed opcode has an exception as its result - assert errors.GetEncodedError(job.ops[opctx.index].result) + opctx.CheckPriorityIncrease() - to_encode = errors.OpExecError("Preceding opcode failed") - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - _EncodeOpError(to_encode)) - finalize = True + # Keep around for another round + job.cur_opctx = opctx - # Consistency check - assert compat.all(i.status == constants.OP_STATUS_ERROR and - errors.GetEncodedError(i.result) - for i in job.ops[opctx.index:]) + assert (op.priority <= constants.OP_PRIO_LOWEST and + op.priority >= constants.OP_PRIO_HIGHEST) - elif op.status == constants.OP_STATUS_CANCELING: - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, - "Job canceled by request") - finalize = True + # In no case must the status be finalized here + assert job.CalcStatus() == constants.JOB_STATUS_QUEUED - elif op.status == constants.OP_STATUS_CANCELED: - finalize = True + queue.UpdateJobUnlocked(job) else: - raise errors.ProgrammerError("Unknown status '%s'" % op.status) + # Ensure all opcodes so far have been successful + assert (opctx.index == 0 or + compat.all(i.status == constants.OP_STATUS_SUCCESS + for i in job.ops[:opctx.index])) + + # Reset context + job.cur_opctx = None + + if op.status == constants.OP_STATUS_SUCCESS: + finalize = False + + elif op.status == constants.OP_STATUS_ERROR: + # Ensure failed opcode has an exception as its result + assert errors.GetEncodedError(job.ops[opctx.index].result) + + to_encode = errors.OpExecError("Preceding opcode failed") + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, + _EncodeOpError(to_encode)) + finalize = True - # Finalizing or last opcode? - if finalize or opctx.index == (opcount - 1): - # All opcodes have been run, finalize job - job.end_timestamp = TimeStampNow() + # Consistency check + assert compat.all(i.status == constants.OP_STATUS_ERROR and + errors.GetEncodedError(i.result) + for i in job.ops[opctx.index:]) - # Write to disk. If the job status is final, this is the final write - # allowed. Once the file has been written, it can be archived anytime. - queue.UpdateJobUnlocked(job) + elif op.status == constants.OP_STATUS_CANCELING: + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, + "Job canceled by request") + finalize = True + + elif op.status == constants.OP_STATUS_CANCELED: + finalize = True + + else: + raise errors.ProgrammerError("Unknown status '%s'" % op.status) + + # Finalizing or last opcode? + if finalize or opctx.index == (opcount - 1): + # All opcodes have been run, finalize job + job.end_timestamp = TimeStampNow() + + # Write to disk. If the job status is final, this is the final write + # allowed. Once the file has been written, it can be archived anytime. + queue.UpdateJobUnlocked(job) - if finalize or opctx.index == (opcount - 1): - logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) - return True + if finalize or opctx.index == (opcount - 1): + logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) + return True return False finally: @@ -988,7 +1104,7 @@ class _JobQueueWorker(workerpool.BaseWorker): if not _JobProcessor(queue, proc.ExecOpCode, job)(): # Schedule again - raise workerpool.DeferTask() + raise workerpool.DeferTask(priority=job.CalcPriority()) class _JobQueueWorkerPool(workerpool.WorkerPool): diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index d573378..ba56ae5 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -27,6 +27,7 @@ import unittest import tempfile import shutil import errno +import itertools from ganeti import constants from ganeti import utils @@ -34,6 +35,7 @@ from ganeti import errors from ganeti import jqueue from ganeti import opcodes from ganeti import compat +from ganeti import mcpu import testutils @@ -447,11 +449,11 @@ class _FakeExecOpCodeForProc: self._before_start = before_start self._after_start = after_start - def __call__(self, op, cbs): + def __call__(self, op, cbs, timeout=None): assert isinstance(op, opcodes.OpTestDummy) if self._before_start: - self._before_start() + self._before_start(timeout) cbs.NotifyStart() @@ -464,7 +466,7 @@ class _FakeExecOpCodeForProc: return op.result -class TestJobProcessor(unittest.TestCase): +class _JobProcessorTestUtils: def _CreateJob(self, queue, job_id, ops): job = jqueue._QueuedJob(queue, job_id, ops) self.assertFalse(job.start_timestamp) @@ -475,6 +477,8 @@ class TestJobProcessor(unittest.TestCase): self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in ops]]) return job + +class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): def _GenericCheckJob(self, job): assert compat.all(isinstance(op.input, opcodes.OpTestDummy) for op in job.ops) @@ -502,7 +506,7 @@ class TestJobProcessor(unittest.TestCase): # Create job job = self._CreateJob(queue, job_id, ops) - def _BeforeStart(): + def _BeforeStart(_): self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) @@ -656,7 +660,7 @@ class TestJobProcessor(unittest.TestCase): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - def _BeforeStart(): + def _BeforeStart(_): self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) @@ -845,7 +849,7 @@ class TestJobProcessor(unittest.TestCase): # Create job job = self._CreateJob(queue, 29386, ops) - def _BeforeStart(): + def _BeforeStart(_): self.assertFalse(queue.IsAcquired()) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) @@ -922,5 +926,190 @@ class TestJobProcessor(unittest.TestCase): self.assertFalse(job.GetLogEntries(count + 3)) +class _FakeTimeoutStrategy: + def __init__(self, timeouts): + self.timeouts = timeouts + self.attempts = 0 + self.last_timeout = None + + def NextAttempt(self): + self.attempts += 1 + if self.timeouts: + timeout = self.timeouts.pop(0) + else: + timeout = None + self.last_timeout = timeout + return timeout + + +class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): + def setUp(self): + self.queue = _FakeQueueForProc() + self.job = None + self.curop = None + self.opcounter = None + self.timeout_strategy = None + self.retries = 0 + self.prev_tsop = None + self.prev_prio = None + self.gave_lock = None + self.done_lock_before_blocking = False + + def _BeforeStart(self, timeout): + job = self.job + + self.assertFalse(self.queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) + + ts = self.timeout_strategy + + self.assert_(timeout is None or isinstance(timeout, (int, float))) + self.assertEqual(timeout, ts.last_timeout) + + self.gave_lock = True + + if (self.curop == 3 and + job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3): + # Give locks before running into blocking acquire + assert self.retries == 7 + self.retries = 0 + self.done_lock_before_blocking = True + return + + if self.retries > 0: + self.assert_(timeout is not None) + self.retries -= 1 + self.gave_lock = False + raise mcpu.LockAcquireTimeout() + + if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST: + assert self.retries == 0, "Didn't exhaust all retries at highest priority" + assert not ts.timeouts + self.assert_(timeout is None) + + def _AfterStart(self, op, cbs): + job = self.job + + self.assertFalse(self.queue.IsAcquired()) + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) + + # Job is running, cancelling shouldn't be possible + (success, _) = job.Cancel() + self.assertFalse(success) + + def _NextOpcode(self): + self.curop = self.opcounter.next() + self.prev_prio = self.job.ops[self.curop].priority + + def _NewTimeoutStrategy(self): + job = self.job + + self.assertEqual(self.retries, 0) + + if self.prev_tsop == self.curop: + # Still on the same opcode, priority must've been increased + self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1) + + if self.curop == 1: + # Normal retry + timeouts = range(10, 31, 10) + self.retries = len(timeouts) - 1 + + elif self.curop == 2: + # Let this run into a blocking acquire + timeouts = range(11, 61, 12) + self.retries = len(timeouts) + + elif self.curop == 3: + # Wait for priority to increase, but give lock before blocking acquire + timeouts = range(12, 100, 14) + self.retries = len(timeouts) + + self.assertFalse(self.done_lock_before_blocking) + + elif self.curop == 4: + self.assert_(self.done_lock_before_blocking) + + # Timeouts, but no need to retry + timeouts = range(10, 31, 10) + self.retries = 0 + + elif self.curop == 5: + # Normal retry + timeouts = range(19, 100, 11) + self.retries = len(timeouts) + + else: + timeouts = [] + self.retries = 0 + + assert len(job.ops) == 10 + assert self.retries <= len(timeouts) + + ts = _FakeTimeoutStrategy(timeouts) + + self.timeout_strategy = ts + self.prev_tsop = self.curop + self.prev_prio = job.ops[self.curop].priority + + return ts + + def testTimeout(self): + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) + for i in range(10)] + + # Create job + job_id = 15801 + job = self._CreateJob(self.queue, job_id, ops) + self.job = job + + self.opcounter = itertools.count(0) + + opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart) + tsf = self._NewTimeoutStrategy + + self.assertFalse(self.done_lock_before_blocking) + + for i in itertools.count(0): + proc = jqueue._JobProcessor(self.queue, opexec, job, + _timeout_strategy_factory=tsf) + + result = proc(_nextop_fn=self._NextOpcode) + if result: + self.assertFalse(job.cur_opctx) + break + + self.assertFalse(result) + + if self.gave_lock: + self.assertFalse(job.cur_opctx) + else: + self.assert_(job.cur_opctx) + self.assertEqual(job.cur_opctx._timeout_strategy._fn, + self.timeout_strategy.NextAttempt) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assert_(job.start_timestamp) + self.assertFalse(job.end_timestamp) + + self.assertEqual(self.curop, len(job.ops) - 1) + self.assertEqual(self.job, job) + self.assertEqual(self.opcounter.next(), len(job.ops)) + self.assert_(self.done_lock_before_blocking) + + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) + self.assertEqual(job.GetInfo(["opresult"]), + [[op.input.result for op in job.ops]]) + self.assertEqual(job.GetInfo(["opstatus"]), + [len(job.ops) * [constants.OP_STATUS_SUCCESS]]) + self.assert_(compat.all(op.start_timestamp and op.end_timestamp + for op in job.ops)) + + # Finished jobs can't be processed any further + self.assertRaises(errors.ProgrammerError, + jqueue._JobProcessor(self.queue, opexec, job)) + + if __name__ == "__main__": testutils.GanetiTestProgram() -- 1.7.0.4
