When a job was cancelled, its status would be changed and the file
written again. Since this was a final status, the job file could be
moved anytime for archival. If the job was still in the queue, however,
it would be processed (not fully, just updating the “end_timestamp”
attribute) and written again. This was bad as it could leave the same
job in two different files.

With this patch the processor is changed to return early for finished
jobs. Cancelling a queued job will finalize it right away. Unittests are
updated.

Signed-off-by: Michael Hanselmann <[email protected]>
---
 lib/jqueue.py                  |   62 ++++++++++++++++++++++-----------------
 test/ganeti.jqueue_unittest.py |   45 +++++++++++++++++++---------
 2 files changed, 65 insertions(+), 42 deletions(-)

diff --git a/lib/jqueue.py b/lib/jqueue.py
index ddf941e..974746e 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -426,6 +426,12 @@ class _QueuedJob(object):
       op.result = result
       not_marked = False
 
+  def Finalize(self):
+    """Marks the job as finalized.
+
+    """
+    self.end_timestamp = TimeStampNow()
+
   def Cancel(self):
     """Marks job as canceled/-ing if possible.
 
@@ -439,6 +445,7 @@ class _QueuedJob(object):
     if status == constants.JOB_STATUS_QUEUED:
       self.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
                              "Job canceled by request")
+      self.Finalize()
       return (True, "Job %s canceled" % self.id)
 
     elif status == constants.JOB_STATUS_WAITLOCK:
@@ -866,21 +873,15 @@ class _JobProcessor(object):
       opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)),
                              timeout_strategy_factory)
 
-      if op.status == constants.OP_STATUS_CANCELED:
-        # Cancelled jobs are handled by the caller
-        assert not compat.any(i.status != constants.OP_STATUS_CANCELED
-                              for i in job.ops[idx:])
-
-      elif op.status in constants.OPS_FINALIZED:
-        # This is a job that was partially completed before master daemon
-        # shutdown, so it can be expected that some opcodes are already
-        # completed successfully (if any did error out, then the whole job
-        # should have been aborted and not resubmitted for processing).
-        logging.info("%s: opcode %s already processed, skipping",
-                     opctx.log_prefix, opctx.summary)
-        continue
+      if op.status not in constants.OPS_FINALIZED:
+        return opctx
 
-      return opctx
+      # This is a job that was partially completed before master daemon
+      # shutdown, so it can be expected that some opcodes are already
+      # completed successfully (if any did error out, then the whole job
+      # should have been aborted and not resubmitted for processing).
+      logging.info("%s: opcode %s already processed, skipping",
+                   opctx.log_prefix, opctx.summary)
 
   @staticmethod
   def _MarkWaitlock(job, op):
@@ -977,6 +978,10 @@ class _JobProcessor(object):
     try:
       opcount = len(job.ops)
 
+      # Don't do anything for finalized jobs
+      if job.CalcStatus() in constants.JOBS_FINALIZED:
+        return True
+
       # Is a previous opcode still pending?
       if job.cur_opctx:
         opctx = job.cur_opctx
@@ -990,20 +995,17 @@ class _JobProcessor(object):
 
       # Consistency check
       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
-                                     constants.OP_STATUS_CANCELING,
-                                     constants.OP_STATUS_CANCELED)
+                                     constants.OP_STATUS_CANCELING)
                         for i in job.ops[opctx.index + 1:])
 
       assert op.status in (constants.OP_STATUS_QUEUED,
                            constants.OP_STATUS_WAITLOCK,
-                           constants.OP_STATUS_CANCELING,
-                           constants.OP_STATUS_CANCELED)
+                           constants.OP_STATUS_CANCELING)
 
       assert (op.priority <= constants.OP_PRIO_LOWEST and
               op.priority >= constants.OP_PRIO_HIGHEST)
 
-      if op.status not in (constants.OP_STATUS_CANCELING,
-                           constants.OP_STATUS_CANCELED):
+      if op.status != constants.OP_STATUS_CANCELING:
         assert op.status in (constants.OP_STATUS_QUEUED,
                              constants.OP_STATUS_WAITLOCK)
 
@@ -1088,22 +1090,22 @@ class _JobProcessor(object):
                                 "Job canceled by request")
           finalize = True
 
-        elif op.status == constants.OP_STATUS_CANCELED:
-          finalize = True
-
         else:
           raise errors.ProgrammerError("Unknown status '%s'" % op.status)
 
-        # Finalizing or last opcode?
-        if finalize or opctx.index == (opcount - 1):
+        if opctx.index == (opcount - 1):
+          # Finalize on last opcode
+          finalize = True
+
+        if finalize:
           # All opcodes have been run, finalize job
-          job.end_timestamp = TimeStampNow()
+          job.Finalize()
 
         # Write to disk. If the job status is final, this is the final write
         # allowed. Once the file has been written, it can be archived anytime.
         queue.UpdateJobUnlocked(job)
 
-        if finalize or opctx.index == (opcount - 1):
+        if finalize:
           logging.info("Finished job %s, status = %s", job.id, 
job.CalcStatus())
           return True
 
@@ -1775,6 +1777,10 @@ class JobQueue(object):
     @param replicate: whether to replicate the change to remote nodes
 
     """
+    if __debug__:
+      finalized = job.CalcStatus() in constants.JOBS_FINALIZED
+      assert (finalized ^ (job.end_timestamp is None))
+
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize(), indent=False)
     logging.debug("Writing job %s to %s", job.id, filename)
@@ -1832,6 +1838,8 @@ class JobQueue(object):
     (success, msg) = job.Cancel()
 
     if success:
+      # If the job was finalized (e.g. cancelled), this is the final write
+      # allowed. The job can be archived anytime.
       self.UpdateJobUnlocked(job)
 
     return (success, msg)
diff --git a/test/ganeti.jqueue_unittest.py b/test/ganeti.jqueue_unittest.py
index 9137927..36e7cf7 100755
--- a/test/ganeti.jqueue_unittest.py
+++ b/test/ganeti.jqueue_unittest.py
@@ -565,9 +565,9 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
 
       self._GenericCheckJob(job)
 
-      # Finished jobs can't be processed any further
-      self.assertRaises(errors.ProgrammerError,
-                        jqueue._JobProcessor(queue, opexec, job))
+      # Calling the processor on a finished job should be a no-op
+      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+      self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testOpcodeError(self):
     queue = _FakeQueueForProc()
@@ -643,9 +643,9 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
 
       self._GenericCheckJob(job)
 
-      # Finished jobs can't be processed any further
-      self.assertRaises(errors.ProgrammerError,
-                        jqueue._JobProcessor(queue, opexec, job))
+      # Calling the processor on a finished job should be a no-op
+      self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+      self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testCancelWhileInQueue(self):
     queue = _FakeQueueForProc()
@@ -665,9 +665,15 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
 
     self.assertRaises(IndexError, queue.GetNextUpdate)
 
+    self.assertFalse(job.start_timestamp)
+    self.assertTrue(job.end_timestamp)
     self.assert_(compat.all(op.status == constants.OP_STATUS_CANCELED
                             for op in job.ops))
 
+    # Serialize to check for differences
+    before_proc = job.Serialize()
+
+    # Simulate processor called in workerpool
     opexec = _FakeExecOpCodeForProc(queue, None, None)
     self.assert_(jqueue._JobProcessor(queue, opexec, job)())
 
@@ -675,13 +681,17 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_CANCELED)
     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_CANCELED])
     self.assertFalse(job.start_timestamp)
-    self.assert_(job.end_timestamp)
+    self.assertTrue(job.end_timestamp)
     self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
                                 for op in job.ops))
     self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
                      [[constants.OP_STATUS_CANCELED for _ in job.ops],
                       ["Job canceled by request" for _ in job.ops]])
 
+    # Must not have changed or written
+    self.assertEqual(before_proc, job.Serialize())
+    self.assertRaises(IndexError, queue.GetNextUpdate)
+
   def testCancelWhileWaitlockInQueue(self):
     queue = _FakeQueueForProc()
 
@@ -903,6 +913,10 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
 
     for remaining in reversed(range(len(job.ops) - successcount)):
       result = jqueue._JobProcessor(queue, opexec, job)()
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertEqual(queue.GetNextUpdate(), (job, True))
+      self.assertRaises(IndexError, queue.GetNextUpdate)
 
       if remaining == 0:
         # Last opcode
@@ -913,6 +927,7 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
 
       self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
 
+    self.assertRaises(IndexError, queue.GetNextUpdate)
     self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_SUCCESS)
     self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_SUCCESS])
     self.assertEqual(job.GetInfo(["opresult"]),
@@ -924,14 +939,14 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
 
     self._GenericCheckJob(job)
 
-    # Finished jobs can't be processed any further
-    self.assertRaises(errors.ProgrammerError,
-                      jqueue._JobProcessor(queue, opexec, job))
+    # Calling the processor on a finished job should be a no-op
+    self.assertTrue(jqueue._JobProcessor(queue, opexec, job)())
+    self.assertRaises(IndexError, queue.GetNextUpdate)
 
     # ... also after being restored
     job2 = jqueue._QueuedJob.Restore(queue, job.Serialize())
-    self.assertRaises(errors.ProgrammerError,
-                      jqueue._JobProcessor(queue, opexec, job2))
+    self.assertTrue(jqueue._JobProcessor(queue, opexec, job2)())
+    self.assertRaises(IndexError, queue.GetNextUpdate)
 
   def testProcessorOnRunningJob(self):
     ops = [opcodes.OpTestDummy(result="result", fail=False)]
@@ -1293,9 +1308,9 @@ class TestJobProcessorTimeouts(unittest.TestCase, 
_JobProcessorTestUtils):
     self.assert_(compat.all(op.start_timestamp and op.end_timestamp
                             for op in job.ops))
 
-    # Finished jobs can't be processed any further
-    self.assertRaises(errors.ProgrammerError,
-                      jqueue._JobProcessor(self.queue, opexec, job))
+    # Calling the processor on a finished job should be a no-op
+    self.assertTrue(jqueue._JobProcessor(self.queue, opexec, job)())
+    self.assertRaises(IndexError, self.queue.GetNextUpdate)
 
 
 if __name__ == "__main__":
-- 
1.7.3.5

Reply via email to