Given that the Python job queue now only handles a single job, there is no need to keep track on whether it accepts new jobs. Hence remove that code. Also remove the corresponding tests.
Signed-off-by: Klaus Aehlig <[email protected]> --- lib/jqueue/__init__.py | 40 -------- test/py/ganeti.jqueue_unittest.py | 190 -------------------------------------- 2 files changed, 230 deletions(-) diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py index 0258f9d..1f61d58 100644 --- a/lib/jqueue/__init__.py +++ b/lib/jqueue/__init__.py @@ -83,12 +83,6 @@ class CancelJob(Exception): """ -class QueueShutdown(Exception): - """Special exception to abort a job when the job queue is shutting down. - - """ - - def TimeStampNow(): """Returns the current timestamp. @@ -612,11 +606,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase): logging.debug("Canceling opcode") raise CancelJob() - # See if queue is shutting down - if not self._queue.AcceptingJobsUnlocked(): - logging.debug("Queue is shutting down") - raise QueueShutdown() - @locking.ssynchronized(_QUEUE, shared=1) def NotifyStart(self): """Mark the opcode as running, not lock-waiting. @@ -1168,10 +1157,6 @@ class _JobProcessor(object): if op.status == constants.OP_STATUS_CANCELING: return (constants.OP_STATUS_CANCELING, None) - # Queue is shutting down, return to queued - if not self.queue.AcceptingJobsUnlocked(): - return (constants.OP_STATUS_QUEUED, None) - # Stay in waitlock while trying to re-acquire lock return (constants.OP_STATUS_WAITING, None) except CancelJob: @@ -1179,14 +1164,6 @@ class _JobProcessor(object): assert op.status == constants.OP_STATUS_CANCELING return (constants.OP_STATUS_CANCELING, None) - except QueueShutdown: - logging.exception("%s: Queue is shutting down", opctx.log_prefix) - - assert op.status == constants.OP_STATUS_WAITING - - # Job hadn't been started yet, so it should return to the queue - return (constants.OP_STATUS_QUEUED, None) - except Exception, err: # pylint: disable=W0703 logging.exception("%s: Caught exception in %s", opctx.log_prefix, opctx.summary) @@ -1665,9 +1642,6 @@ class JobQueue(object): self.acquire = self._lock.acquire self.release = self._lock.release - # Accept jobs by default - self._accepting_jobs = True - # Read serial file self._last_serial = jstore.ReadSerial() assert self._last_serial is not None, ("Serial file was modified between" @@ -2512,22 +2486,8 @@ class JobQueue(object): @return: Whether there are any running jobs """ - if self._accepting_jobs: - self._accepting_jobs = False - return self._wpool.HasRunningTasks() - def AcceptingJobsUnlocked(self): - """Returns whether jobs are accepted. - - Once L{PrepareShutdown} has been called, no new jobs are accepted and the - queue is shutting down. - - @rtype: bool - - """ - return self._accepting_jobs - @locking.ssynchronized(_LOCK) def Shutdown(self): """Stops the job queue. diff --git a/test/py/ganeti.jqueue_unittest.py b/test/py/ganeti.jqueue_unittest.py index 1aa3ec9..7ff14a0 100755 --- a/test/py/ganeti.jqueue_unittest.py +++ b/test/py/ganeti.jqueue_unittest.py @@ -715,7 +715,6 @@ class _FakeQueueForProc: self._acquired = False self._updates = [] self._submitted = [] - self._accepting_jobs = True self._submit_count = itertools.count(1000) @@ -751,12 +750,6 @@ class _FakeQueueForProc: self._submitted.extend(zip(job_ids, jobs)) return job_ids - def StopAcceptingJobs(self): - self._accepting_jobs = False - - def AcceptingJobsUnlocked(self): - return self._accepting_jobs - class _FakeExecOpCodeForProc: def __init__(self, queue, before_start, after_start): @@ -1154,10 +1147,6 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self._TestCancelWhileSomething(fn) - def testCancelDuringQueueShutdown(self): - queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs()) - self.assertFalse(queue.AcceptingJobsUnlocked()) - def testCancelWhileRunning(self): # Tests canceling a job with finished opcodes and more, unprocessed ones queue = _FakeQueueForProc() @@ -1204,185 +1193,6 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): ["Res0", "Job canceled by request", "Job canceled by request"]]) - def _TestQueueShutdown(self, queue, opexec, job, runcount): - self.assertTrue(queue.AcceptingJobsUnlocked()) - - # Simulate shutdown - queue.StopAcceptingJobs() - - self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), - jqueue._JobProcessor.DEFER) - - # Check result - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED]) - self.assertFalse(job.cur_opctx) - self.assertTrue(job.start_timestamp) - self.assertFalse(job.end_timestamp) - self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp) - self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp - for op in job.ops[:runcount])) - self.assertFalse(job.ops[runcount].end_timestamp) - self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp - for op in job.ops[(runcount + 1):])) - self.assertEqual(job.GetInfo(["opstatus", "opresult"]), - [(([constants.OP_STATUS_SUCCESS] * runcount) + - ([constants.OP_STATUS_QUEUED] * - (len(job.ops) - runcount))), - (["Res%s" % i for i in range(runcount)] + - ([None] * (len(job.ops) - runcount)))]) - - # Must have been written and replicated - self.assertEqual(queue.GetNextUpdate(), (job, True)) - self.assertRaises(IndexError, queue.GetNextUpdate) - - def testQueueShutdownWhileRunning(self): - # Tests shutting down the queue while a job is running - queue = _FakeQueueForProc() - - ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) - for i in range(3)] - - # Create job - job_id = 2718211587 - job = self._CreateJob(queue, job_id, ops) - - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - - opexec = _FakeExecOpCodeForProc(queue, None, None) - - self.assertRaises(IndexError, queue.GetNextUpdate) - - # Run one opcode - self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), - jqueue._JobProcessor.DEFER) - - # Job goes back to queued - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - self.assertEqual(job.GetInfo(["opstatus", "opresult"]), - [[constants.OP_STATUS_SUCCESS, - constants.OP_STATUS_QUEUED, - constants.OP_STATUS_QUEUED], - ["Res0", None, None]]) - self.assertFalse(job.cur_opctx) - - # Writes for waiting, running and result - for _ in range(3): - self.assertEqual(queue.GetNextUpdate(), (job, True)) - - # Run second opcode - self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), - jqueue._JobProcessor.DEFER) - - # Job goes back to queued - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - self.assertEqual(job.GetInfo(["opstatus", "opresult"]), - [[constants.OP_STATUS_SUCCESS, - constants.OP_STATUS_SUCCESS, - constants.OP_STATUS_QUEUED], - ["Res0", "Res1", None]]) - self.assertFalse(job.cur_opctx) - - # Writes for waiting, running and result - for _ in range(3): - self.assertEqual(queue.GetNextUpdate(), (job, True)) - - self._TestQueueShutdown(queue, opexec, job, 2) - - def testQueueShutdownWithLockTimeout(self): - # Tests shutting down while a lock acquire times out - queue = _FakeQueueForProc() - - ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) - for i in range(3)] - - # Create job - job_id = 1304231178 - job = self._CreateJob(queue, job_id, ops) - - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - - acquire_timeout = False - - def _BeforeStart(timeout, priority): - self.assertFalse(queue.IsAcquired()) - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) - if acquire_timeout: - raise mcpu.LockAcquireTimeout() - - opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None) - - self.assertRaises(IndexError, queue.GetNextUpdate) - - # Run one opcode - self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(), - jqueue._JobProcessor.DEFER) - - # Job goes back to queued - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - self.assertEqual(job.GetInfo(["opstatus", "opresult"]), - [[constants.OP_STATUS_SUCCESS, - constants.OP_STATUS_QUEUED, - constants.OP_STATUS_QUEUED], - ["Res0", None, None]]) - self.assertFalse(job.cur_opctx) - - # Writes for waiting, running and result - for _ in range(3): - self.assertEqual(queue.GetNextUpdate(), (job, True)) - - # The next opcode should have expiring lock acquires - acquire_timeout = True - - self._TestQueueShutdown(queue, opexec, job, 1) - - def testQueueShutdownWhileInQueue(self): - # This should never happen in reality (no new jobs are started by the - # workerpool once a shutdown has been initiated), but it's better to test - # the job processor for this scenario - queue = _FakeQueueForProc() - - ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) - for i in range(5)] - - # Create job - job_id = 2031 - job = self._CreateJob(queue, job_id, ops) - - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - self.assertRaises(IndexError, queue.GetNextUpdate) - - self.assertFalse(job.start_timestamp) - self.assertFalse(job.end_timestamp) - self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED - for op in job.ops)) - - opexec = _FakeExecOpCodeForProc(queue, None, None) - self._TestQueueShutdown(queue, opexec, job, 0) - - def testQueueShutdownWhileWaitlockInQueue(self): - queue = _FakeQueueForProc() - - ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False) - for i in range(5)] - - # Create job - job_id = 53125685 - job = self._CreateJob(queue, job_id, ops) - - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED) - - job.ops[0].status = constants.OP_STATUS_WAITING - - assert len(job.ops) == 5 - - self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING) - - self.assertRaises(IndexError, queue.GetNextUpdate) - - opexec = _FakeExecOpCodeForProc(queue, None, None) - self._TestQueueShutdown(queue, opexec, job, 0) - def testPartiallyRun(self): # Tests calling the processor on a job that's been partially run before the # program was restarted -- 2.0.0.526.g5318336
