When a job was cancelled, its status would be changed and the file written again. Since this was a final status, the job file could be moved anytime for archival. If the job was still in the queue, however, it would be processed (not fully, just updating the “end_timestamp” attribute) and written again. This was bad as it could leave the same job in two different files.
With this patch the processor is changed to return early for finished jobs. Cancelling a queued job will finalize it right away. Unittests are updated. Signed-off-by: Michael Hanselmann <[email protected]> --- lib/jqueue.py | 62 ++++++++++++++++++++++----------------- test/ganeti.jqueue_unittest.py | 45 +++++++++++++++++++--------- 2 files changed, 65 insertions(+), 42 deletions(-) diff --git a/lib/jqueue.py b/lib/jqueue.py index ddf941e..974746e 100644 --- a/lib/jqueue.py +++ b/lib/jqueue.py @@ -426,6 +426,12 @@ class _QueuedJob(object): op.result = result not_marked = False + def Finalize(self): + """Marks the job as finalized. + + """ + self.end_timestamp = TimeStampNow() + def Cancel(self): """Marks job as canceled/-ing if possible. @@ -439,6 +445,7 @@ class _QueuedJob(object): if status == constants.JOB_STATUS_QUEUED: self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, "Job canceled by request") + self.Finalize() return (True, "Job %s canceled" % self.id) elif status == constants.JOB_STATUS_WAITLOCK: @@ -866,21 +873,15 @@ class _JobProcessor(object): opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)), timeout_strategy_factory) - if op.status == constants.OP_STATUS_CANCELED: - # Cancelled jobs are handled by the caller - assert not compat.any(i.status != constants.OP_STATUS_CANCELED - for i in job.ops[idx:]) - - elif op.status in constants.OPS_FINALIZED: - # This is a job that was partially completed before master daemon - # shutdown, so it can be expected that some opcodes are already - # completed successfully (if any did error out, then the whole job - # should have been aborted and not resubmitted for processing). - logging.info("%s: opcode %s already processed, skipping", - opctx.log_prefix, opctx.summary) - continue + if op.status not in constants.OPS_FINALIZED: + return opctx - return opctx + # This is a job that was partially completed before master daemon + # shutdown, so it can be expected that some opcodes are already + # completed successfully (if any did error out, then the whole job + # should have been aborted and not resubmitted for processing). + logging.info("%s: opcode %s already processed, skipping", + opctx.log_prefix, opctx.summary) @staticmethod def _MarkWaitlock(job, op): @@ -977,6 +978,10 @@ class _JobProcessor(object): try: opcount = len(job.ops) + # Don't do anything for finalized jobs + if job.CalcStatus() in constants.JOBS_FINALIZED: + return True + # Is a previous opcode still pending? if job.cur_opctx: opctx = job.cur_opctx @@ -990,20 +995,17 @@ class _JobProcessor(object): # Consistency check assert compat.all(i.status in (constants.OP_STATUS_QUEUED, - constants.OP_STATUS_CANCELING, - constants.OP_STATUS_CANCELED) + constants.OP_STATUS_CANCELING) for i in job.ops[opctx.index + 1:]) assert op.status in (constants.OP_STATUS_QUEUED, constants.OP_STATUS_WAITLOCK, - constants.OP_STATUS_CANCELING, - constants.OP_STATUS_CANCELED) + constants.OP_STATUS_CANCELING) assert (op.priority <= constants.OP_PRIO_LOWEST and op.priority >= constants.OP_PRIO_HIGHEST) - if op.status not in (constants.OP_STATUS_CANCELING, - constants.OP_STATUS_CANCELED): + if op.status != constants.OP_STATUS_CANCELING: assert op.status in (constants.OP_STATUS_QUEUED, constants.OP_STATUS_WAITLOCK) @@ -1088,22 +1090,22 @@ class _JobProcessor(object): "Job canceled by request") finalize = True - elif op.status == constants.OP_STATUS_CANCELED: - finalize = True - else: raise errors.ProgrammerError("Unknown status '%s'" % op.status) - # Finalizing or last opcode? - if finalize or opctx.index == (opcount - 1): + if opctx.index == (opcount - 1): + # Finalize on last opcode + finalize = True + + if finalize: # All opcodes have been run, finalize job - job.end_timestamp = TimeStampNow() + job.Finalize() # Write to disk. If the job status is final, this is the final write # allowed. Once the file has been written, it can be archived anytime. queue.UpdateJobUnlocked(job) - if finalize or opctx.index == (opcount - 1): + if finalize: logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) return True @@ -1775,6 +1777,10 @@ class JobQueue(object): @param replicate: whether to replicate the change to remote nodes """ + if __debug__: + finalized = job.CalcStatus() in constants.JOBS_FINALIZED + assert (finalized ^ (job.end_timestamp is None)) + filename = self._GetJobPath(job.id) data = serializer.DumpJson(job.Serialize(), indent=False) logging.debug("Writing job %s to %s", job.id, filename) @@ -1832,6 +1838,8 @@ class JobQueue(object): (success, msg) = job.Cancel() if success: + # If the job was finalized (e.g. cancelled), this is the final write + # allowed. The job can be archived anytime. self.UpdateJobUnlocked(job) return (success, msg) diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py index 9137927..36e7cf7 100755 --- a/test/ganeti.jqueue_unittest.py +++ b/test/ganeti.jqueue_unittest.py @@ -565,9 +565,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._GenericCheckJob(job) - # Finished jobs can't be processed any further - self.assertRaises(errors.ProgrammerError, - jqueue._JobProcessor(queue, opexec, job)) + # Calling the processor on a finished job should be a no-op + self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) + self.assertRaises(IndexError, queue.GetNextUpdate) def testOpcodeError(self): queue = _FakeQueueForProc() @@ -643,9 +643,9 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._GenericCheckJob(job) - # Finished jobs can't be processed any further - self.assertRaises(errors.ProgrammerError, - jqueue._JobProcessor(queue, opexec, job)) + # Calling the processor on a finished job should be a no-op + self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) + self.assertRaises(IndexError, queue.GetNextUpdate) def testCancelWhileInQueue(self): queue = _FakeQueueForProc() @@ -665,9 +665,15 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, queue.GetNextUpdate) + self.assertFalse(job.start_timestamp) + self.assertTrue(job.end_timestamp) self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED for op in job.ops)) + # Serialize to check for differences + before_proc = job.Serialize() + + # Simulate processor called in workerpool opexec = _FakeExecOpCodeForProc(queue, None, None) self.assert_(jqueue._JobProcessor(queue, opexec, job)()) @@ -675,13 +681,17 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED) self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED]) self.assertFalse(job.start_timestamp) - self.assert_(job.end_timestamp) + self.assertTrue(job.end_timestamp) self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp for op in job.ops)) self.assertEqual(job.GetInfo(["opstatus", "opresult"]), [[constants.OP_STATUS_CANCELED for _ in job.ops], ["Job canceled by request" for _ in job.ops]]) + # Must not have changed or written + self.assertEqual(before_proc, job.Serialize()) + self.assertRaises(IndexError, queue.GetNextUpdate) + def testCancelWhileWaitlockInQueue(self): queue = _FakeQueueForProc() @@ -903,6 +913,10 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): for remaining in reversed(range(len(job.ops) - successcount)): result = jqueue._JobProcessor(queue, opexec, job)() + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertEqual(queue.GetNextUpdate(), (job, True)) + self.assertRaises(IndexError, queue.GetNextUpdate) if remaining == 0: # Last opcode @@ -913,6 +927,7 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) + self.assertRaises(IndexError, queue.GetNextUpdate) self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS) self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS]) self.assertEqual(job.GetInfo(["opresult"]), @@ -924,14 +939,14 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._GenericCheckJob(job) - # Finished jobs can't be processed any further - self.assertRaises(errors.ProgrammerError, - jqueue._JobProcessor(queue, opexec, job)) + # Calling the processor on a finished job should be a no-op + self.assertTrue(jqueue._JobProcessor(queue, opexec, job)()) + self.assertRaises(IndexError, queue.GetNextUpdate) # ... also after being restored job2 = jqueue._QueuedJob.Restore(queue, job.Serialize()) - self.assertRaises(errors.ProgrammerError, - jqueue._JobProcessor(queue, opexec, job2)) + self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)()) + self.assertRaises(IndexError, queue.GetNextUpdate) def testProcessorOnRunningJob(self): ops = [opcodes.OpTestDummy(result="result", fail=False)] @@ -1293,9 +1308,9 @@ class TestJobProcessorTimeouts(unittest.TestCase, _JobProcessorTestUtils): self.assert_(compat.all(op.start_timestamp and op.end_timestamp for op in job.ops)) - # Finished jobs can't be processed any further - self.assertRaises(errors.ProgrammerError, - jqueue._JobProcessor(self.queue, opexec, job)) + # Calling the processor on a finished job should be a no-op + self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)()) + self.assertRaises(IndexError, self.queue.GetNextUpdate) if __name__ == "__main__": -- 1.7.3.5
