On Thu, Sep 23, 2010 at 3:05 PM, Michael Hanselmann <[email protected]> wrote: > As already noted in the design document, an opcode's priority is > increased when the lock(s) can't be acquired within a certain amount of > time, except at the highest priority, where in such a case a blocking > acquire is used. > > A unittest is provided. Priorities are not yet used for acquiring the > lock(s)—this will need further changes on mcpu. > > Signed-off-by: Michael Hanselmann <[email protected]> > --- > daemons/ganeti-masterd | 3 + > lib/jqueue.py | 212 > +++++++++++++++++++++++++++++++--------- > test/ganeti.jqueue_unittest.py | 201 ++++++++++++++++++++++++++++++++++++- > 3 files changed, 362 insertions(+), 54 deletions(-) > > diff --git a/daemons/ganeti-masterd b/daemons/ganeti-masterd > index d627c47..8c32504 100755 > --- a/daemons/ganeti-masterd > +++ b/daemons/ganeti-masterd > @@ -314,6 +314,9 @@ class ClientOps: > """ > # Queries don't have a job id > proc = mcpu.Processor(self.server.context, None) > + > + # TODO: Executing an opcode using locks will acquire them in blocking > mode. > + # Consider using a timeout for retries. > return proc.ExecOpCode(op, None) > > > diff --git a/lib/jqueue.py b/lib/jqueue.py > index c7fd4c7..7411f46 100644 > --- a/lib/jqueue.py > +++ b/lib/jqueue.py > @@ -176,7 +176,7 @@ class _QueuedJob(object): > > """ > # pylint: disable-msg=W0212 > - __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", > + __slots__ = ["queue", "id", "ops", "log_serial", "ops_iter", "cur_opctx", > "received_timestamp", "start_timestamp", "end_timestamp", > "__weakref__"] > > @@ -211,6 +211,7 @@ class _QueuedJob(object): > > """ > obj.ops_iter = None > + obj.cur_opctx = None > > def __repr__(self): > status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__), > @@ -745,8 +746,40 @@ def _EncodeOpError(err): > return errors.EncodeException(to_encode) > > > +class _TimeoutStrategyWrapper: > + def __init__(self, fn): > + """Initializes this class. > + > + """ > + self._fn = fn > + self._next = None > + > + def _Advance(self): > + """Gets the next timeout if necessary. > + > + """ > + if self._next is None: > + self._next = self._fn() > + > + def Peek(self): > + """Returns the next timeout. > + > + """ > + self._Advance() > + return self._next > + > + def Next(self): > + """Returns the current timeout and advances the internal state. > + > + """ > + self._Advance() > + result = self._next > + self._next = None > + return result > + > + > class _OpExecContext: > - def __init__(self, op, index, log_prefix): > + def __init__(self, op, index, log_prefix, timeout_strategy_factory): > """Initializes this class. > > """ > @@ -755,22 +788,60 @@ class _OpExecContext: > self.log_prefix = log_prefix > self.summary = op.input.Summary() > > + self._timeout_strategy_factory = timeout_strategy_factory > + self._ResetTimeoutStrategy() > + > + def _ResetTimeoutStrategy(self): > + """Creates a new timeout strategy. > + > + """ > + self._timeout_strategy = \ > + _TimeoutStrategyWrapper(self._timeout_strategy_factory().NextAttempt) > + > + def CheckPriorityIncrease(self): > + """Checks whether priority can and should be increased. > + > + Called when locks couldn't be acquired. > + > + """ > + op = self.op > + > + # Exhausted all retries and next round should not use blocking acquire > + # for locks? > + if (self._timeout_strategy.Peek() is None and > + op.priority > constants.OP_PRIO_HIGHEST): > + logging.debug("Increasing priority") > + op.priority -= 1 > + self._ResetTimeoutStrategy() > + return True > + > + return False > + > + def GetNextLockTimeout(self): > + """Returns the next lock acquire timeout. > + > + """ > + return self._timeout_strategy.Next() > + > > class _JobProcessor(object): > - def __init__(self, queue, opexec_fn, job): > + def __init__(self, queue, opexec_fn, job, > + _timeout_strategy_factory=mcpu.LockAttemptTimeoutStrategy): > """Initializes this class. > > """ > self.queue = queue > self.opexec_fn = opexec_fn > self.job = job > + self._timeout_strategy_factory = _timeout_strategy_factory > > @staticmethod > - def _FindNextOpcode(job): > + def _FindNextOpcode(job, timeout_strategy_factory): > """Locates the next opcode to run. > > @type job: L{_QueuedJob} > @param job: Job object > + �...@param timeout_strategy_factory: Callable to create new timeout > strategy > > """ > # Create some sort of a cache to speed up locating next opcode for future > @@ -791,7 +862,8 @@ class _JobProcessor(object): > # Found an opcode already marked as running > raise errors.ProgrammerError("Called for job marked as running") > > - opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops))) > + opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), > + timeout_strategy_factory) > > if op.status == constants.OP_STATUS_CANCELED: > # Cancelled jobs are handled by the caller > @@ -838,10 +910,18 @@ class _JobProcessor(object): > > assert op.status == constants.OP_STATUS_WAITLOCK > > + timeout = opctx.GetNextLockTimeout() > + > try: > # Make sure not to hold queue lock while calling ExecOpCode > result = self.opexec_fn(op.input, > - _OpExecCallbacks(self.queue, self.job, op)) > + _OpExecCallbacks(self.queue, self.job, op), > + timeout=timeout) > + except mcpu.LockAcquireTimeout: > + assert timeout is not None, "Received timeout for blocking acquire" > + logging.debug("Couldn't acquire locks in %0.6fs", timeout) > + assert op.status == constants.OP_STATUS_WAITLOCK > + return (constants.OP_STATUS_QUEUED, None) > except CancelJob: > logging.exception("%s: Canceling job", opctx.log_prefix) > assert op.status == constants.OP_STATUS_CANCELING > @@ -855,9 +935,10 @@ class _JobProcessor(object): > opctx.log_prefix, opctx.summary) > return (constants.OP_STATUS_SUCCESS, result) > > - def __call__(self): > + def __call__(self, _nextop_fn=None): > """Continues execution of a job. > > + �...@param _nextop_fn: Callback function for tests > @rtype: bool > @return: True if job is finished, False if processor needs to be called > again > @@ -872,7 +953,14 @@ class _JobProcessor(object): > try: > opcount = len(job.ops) > > - opctx = self._FindNextOpcode(job) > + # Is a previous opcode still pending? > + if job.cur_opctx: > + opctx = job.cur_opctx > + else: > + if __debug__ and _nextop_fn: > + _nextop_fn() > + opctx = self._FindNextOpcode(job, self._timeout_strategy_factory) > + > op = opctx.op > > # Consistency check > @@ -884,6 +972,9 @@ class _JobProcessor(object): > constants.OP_STATUS_WAITLOCK, > constants.OP_STATUS_CANCELED) > > + assert (op.priority <= constants.OP_PRIO_LOWEST and > + op.priority >= constants.OP_PRIO_HIGHEST) > + > if op.status != constants.OP_STATUS_CANCELED: > # Prepare to start opcode > self._MarkWaitlock(job, op) > @@ -903,62 +994,87 @@ class _JobProcessor(object): > finally: > queue.acquire(shared=1) > > - # Finalize opcode > - op.end_timestamp = TimeStampNow() > op.status = op_status > op.result = op_result > > - if op.status == constants.OP_STATUS_CANCELING: > - assert not compat.any(i.status != constants.OP_STATUS_CANCELING > - for i in job.ops[opctx.index:]) > + if op.status == constants.OP_STATUS_QUEUED: > + # Couldn't get locks in time > + assert not op.end_timestamp > else: > - assert op.status in constants.OPS_FINALIZED > + # Finalize opcode > + op.end_timestamp = TimeStampNow() > > - # Ensure all opcodes so far have been successful > - assert (opctx.index == 0 or > - compat.all(i.status == constants.OP_STATUS_SUCCESS > - for i in job.ops[:opctx.index])) > + if op.status == constants.OP_STATUS_CANCELING: > + assert not compat.any(i.status != constants.OP_STATUS_CANCELING > + for i in job.ops[opctx.index:]) > + else: > + assert op.status in constants.OPS_FINALIZED > > - if op.status == constants.OP_STATUS_SUCCESS: > + if op.status == constants.OP_STATUS_QUEUED: > finalize = False > > - elif op.status == constants.OP_STATUS_ERROR: > - # Ensure failed opcode has an exception as its result > - assert errors.GetEncodedError(job.ops[opctx.index].result) > + opctx.CheckPriorityIncrease() > > - to_encode = errors.OpExecError("Preceding opcode failed") > - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, > - _EncodeOpError(to_encode)) > - finalize = True > + # Keep around for another round > + job.cur_opctx = opctx > > - # Consistency check > - assert compat.all(i.status == constants.OP_STATUS_ERROR and > - errors.GetEncodedError(i.result) > - for i in job.ops[opctx.index:]) > + assert (op.priority <= constants.OP_PRIO_LOWEST and > + op.priority >= constants.OP_PRIO_HIGHEST) > > - elif op.status == constants.OP_STATUS_CANCELING: > - job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, > - "Job canceled by request") > - finalize = True > + # In no case must the status be finalized here > + assert job.CalcStatus() == constants.JOB_STATUS_QUEUED > > - elif op.status == constants.OP_STATUS_CANCELED: > - finalize = True > + queue.UpdateJobUnlocked(job) > > else: > - raise errors.ProgrammerError("Unknown status '%s'" % op.status) > + # Ensure all opcodes so far have been successful > + assert (opctx.index == 0 or > + compat.all(i.status == constants.OP_STATUS_SUCCESS > + for i in job.ops[:opctx.index])) > + > + # Reset context > + job.cur_opctx = None > + > + if op.status == constants.OP_STATUS_SUCCESS: > + finalize = False > + > + elif op.status == constants.OP_STATUS_ERROR: > + # Ensure failed opcode has an exception as its result > + assert errors.GetEncodedError(job.ops[opctx.index].result) > + > + to_encode = errors.OpExecError("Preceding opcode failed") > + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, > + _EncodeOpError(to_encode)) > + finalize = True > > - # Finalizing or last opcode? > - if finalize or opctx.index == (opcount - 1): > - # All opcodes have been run, finalize job > - job.end_timestamp = TimeStampNow() > + # Consistency check > + assert compat.all(i.status == constants.OP_STATUS_ERROR and > + errors.GetEncodedError(i.result) > + for i in job.ops[opctx.index:]) > > - # Write to disk. If the job status is final, this is the final write > - # allowed. Once the file has been written, it can be archived anytime. > - queue.UpdateJobUnlocked(job) > + elif op.status == constants.OP_STATUS_CANCELING: > + job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, > + "Job canceled by request") > + finalize = True > + > + elif op.status == constants.OP_STATUS_CANCELED: > + finalize = True > + > + else: > + raise errors.ProgrammerError("Unknown status '%s'" % op.status) > + > + # Finalizing or last opcode? > + if finalize or opctx.index == (opcount - 1): > + # All opcodes have been run, finalize job > + job.end_timestamp = TimeStampNow() > + > + # Write to disk. If the job status is final, this is the final write > + # allowed. Once the file has been written, it can be archived > anytime. > + queue.UpdateJobUnlocked(job) > > - if finalize or opctx.index == (opcount - 1): > - logging.info("Finished job %s, status = %s", job.id, > job.CalcStatus()) > - return True > + if finalize or opctx.index == (opcount - 1): > + logging.info("Finished job %s, status = %s", job.id, > job.CalcStatus()) > + return True > > return False > finally: > @@ -988,7 +1104,7 @@ class _JobQueueWorker(workerpool.BaseWorker): > > if not _JobProcessor(queue, proc.ExecOpCode, job)(): > # Schedule again > - raise workerpool.DeferTask() > + raise workerpool.DeferTask(priority=job.CalcPriority()) > > > class _JobQueueWorkerPool(workerpool.WorkerPool): > diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py > index d573378..ba56ae5 100755 > --- a/test/ganeti.jqueue_unittest.py > +++ b/test/ganeti.jqueue_unittest.py > @@ -27,6 +27,7 @@ import unittest > import tempfile > import shutil > import errno > +import itertools > > from ganeti import constants > from ganeti import utils > @@ -34,6 +35,7 @@ from ganeti import errors > from ganeti import jqueue > from ganeti import opcodes > from ganeti import compat > +from ganeti import mcpu > > import testutils > > @@ -447,11 +449,11 @@ class _FakeExecOpCodeForProc: > self._before_start = before_start > self._after_start = after_start > > - def __call__(self, op, cbs): > + def __call__(self, op, cbs, timeout=None): > assert isinstance(op, opcodes.OpTestDummy) > > if self._before_start: > - self._before_start() > + self._before_start(timeout) > > cbs.NotifyStart() > > @@ -464,7 +466,7 @@ class _FakeExecOpCodeForProc: > return op.result > > > -class TestJobProcessor(unittest.TestCase): > +class _JobProcessorTestUtils: > def _CreateJob(self, queue, job_id, ops): > job = jqueue._QueuedJob(queue, job_id, ops) > self.assertFalse(job.start_timestamp) > @@ -475,6 +477,8 @@ class TestJobProcessor(unittest.TestCase): > self.assertEqual(job.GetInfo(["ops"]), [[op.__getstate__() for op in > ops]]) > return job > > + > +class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): > def _GenericCheckJob(self, job): > assert compat.all(isinstance(op.input, opcodes.OpTestDummy) > for op in job.ops) > @@ -502,7 +506,7 @@ class TestJobProcessor(unittest.TestCase): > # Create job > job = self._CreateJob(queue, job_id, ops) > > - def _BeforeStart(): > + def _BeforeStart(_): > self.assertFalse(queue.IsAcquired()) > self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) > > @@ -656,7 +660,7 @@ class TestJobProcessor(unittest.TestCase): > > self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > > - def _BeforeStart(): > + def _BeforeStart(_): > self.assertFalse(queue.IsAcquired()) > self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) > > @@ -845,7 +849,7 @@ class TestJobProcessor(unittest.TestCase): > # Create job > job = self._CreateJob(queue, 29386, ops) > > - def _BeforeStart(): > + def _BeforeStart(_): > self.assertFalse(queue.IsAcquired()) > self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) > > @@ -922,5 +926,190 @@ class TestJobProcessor(unittest.TestCase): > self.assertFalse(job.GetLogEntries(count + 3)) > > > +class _FakeTimeoutStrategy: > + def __init__(self, timeouts): > + self.timeouts = timeouts > + self.attempts = 0 > + self.last_timeout = None > + > + def NextAttempt(self): > + self.attempts += 1 > + if self.timeouts: > + timeout = self.timeouts.pop(0) > + else: > + timeout = None > + self.last_timeout = timeout > + return timeout > + > + > +class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): > + def setUp(self): > + self.queue = _FakeQueueForProc() > + self.job = None > + self.curop = None > + self.opcounter = None > + self.timeout_strategy = None > + self.retries = 0 > + self.prev_tsop = None > + self.prev_prio = None > + self.gave_lock = None > + self.done_lock_before_blocking = False > + > + def _BeforeStart(self, timeout): > + job = self.job > + > + self.assertFalse(self.queue.IsAcquired()) > + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITLOCK) > + > + ts = self.timeout_strategy > + > + self.assert_(timeout is None or isinstance(timeout, (int, float))) > + self.assertEqual(timeout, ts.last_timeout) > + > + self.gave_lock = True > + > + if (self.curop == 3 and > + job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST + 3): > + # Give locks before running into blocking acquire > + assert self.retries == 7 > + self.retries = 0 > + self.done_lock_before_blocking = True > + return > + > + if self.retries > 0: > + self.assert_(timeout is not None) > + self.retries -= 1 > + self.gave_lock = False > + raise mcpu.LockAcquireTimeout() > + > + if job.ops[self.curop].priority == constants.OP_PRIO_HIGHEST: > + assert self.retries == 0, "Didn't exhaust all retries at highest > priority" > + assert not ts.timeouts > + self.assert_(timeout is None) > + > + def _AfterStart(self, op, cbs): > + job = self.job > + > + self.assertFalse(self.queue.IsAcquired()) > + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING) > + > + # Job is running, cancelling shouldn't be possible > + (success, _) = job.Cancel() > + self.assertFalse(success) > + > + def _NextOpcode(self): > + self.curop = self.opcounter.next() > + self.prev_prio = self.job.ops[self.curop].priority > + > + def _NewTimeoutStrategy(self): > + job = self.job > + > + self.assertEqual(self.retries, 0) > + > + if self.prev_tsop == self.curop: > + # Still on the same opcode, priority must've been increased > + self.assertEqual(self.prev_prio, job.ops[self.curop].priority + 1) > + > + if self.curop == 1: > + # Normal retry > + timeouts = range(10, 31, 10) > + self.retries = len(timeouts) - 1 > + > + elif self.curop == 2: > + # Let this run into a blocking acquire > + timeouts = range(11, 61, 12) > + self.retries = len(timeouts) > + > + elif self.curop == 3: > + # Wait for priority to increase, but give lock before blocking acquire > + timeouts = range(12, 100, 14) > + self.retries = len(timeouts) > + > + self.assertFalse(self.done_lock_before_blocking) > + > + elif self.curop == 4: > + self.assert_(self.done_lock_before_blocking) > + > + # Timeouts, but no need to retry > + timeouts = range(10, 31, 10) > + self.retries = 0 > + > + elif self.curop == 5: > + # Normal retry > + timeouts = range(19, 100, 11) > + self.retries = len(timeouts) > + > + else: > + timeouts = [] > + self.retries = 0 > + > + assert len(job.ops) == 10 > + assert self.retries <= len(timeouts) > + > + ts = _FakeTimeoutStrategy(timeouts) > + > + self.timeout_strategy = ts > + self.prev_tsop = self.curop > + self.prev_prio = job.ops[self.curop].priority > + > + return ts > + > + def testTimeout(self): > + ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) > + for i in range(10)] > + > + # Create job > + job_id = 15801 > + job = self._CreateJob(self.queue, job_id, ops) > + self.job = job > + > + self.opcounter = itertools.count(0) > + > + opexec = _FakeExecOpCodeForProc(self._BeforeStart, self._AfterStart) > + tsf = self._NewTimeoutStrategy > + > + self.assertFalse(self.done_lock_before_blocking) > + > + for i in itertools.count(0): > + proc = jqueue._JobProcessor(self.queue, opexec, job, > + _timeout_strategy_factory=tsf) > + > + result = proc(_nextop_fn=self._NextOpcode) > + if result: > + self.assertFalse(job.cur_opctx) > + break > + > + self.assertFalse(result) > + > + if self.gave_lock: > + self.assertFalse(job.cur_opctx) > + else: > + self.assert_(job.cur_opctx) > + self.assertEqual(job.cur_opctx._timeout_strategy._fn, > + self.timeout_strategy.NextAttempt) > + > + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) > + self.assert_(job.start_timestamp) > + self.assertFalse(job.end_timestamp) > + > + self.assertEqual(self.curop, len(job.ops) - 1) > + self.assertEqual(self.job, job) > + self.assertEqual(self.opcounter.next(), len(job.ops)) > + self.assert_(self.done_lock_before_blocking) > + > + self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) > + self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) > + self.assertEqual(job.GetInfo(["opresult"]), > + [[op.input.result for op in job.ops]]) > + self.assertEqual(job.GetInfo(["opstatus"]), > + [len(job.ops) * [constants.OP_STATUS_SUCCESS]]) > + self.assert_(compat.all(op.start_timestamp and op.end_timestamp > + for op in job.ops)) > + > + # Finished jobs can't be processed any further > + self.assertRaises(errors.ProgrammerError, > + jqueue._JobProcessor(self.queue, opexec, job)) > + > + > if __name__ == "__main__": > testutils.GanetiTestProgram() > -- > 1.7.0.4
LGTM > >
