Given that the Python job queue now only handles a single job,
there is no need to keep track on whether it accepts new jobs.
Hence remove that code. Also remove the corresponding tests.

Signed-off-by: Klaus Aehlig <[email protected]>
---
 lib/jqueue/__init__.py            |  40 --------
 test/py/ganeti.jqueue_unittest.py | 190 --------------------------------------
 2 files changed, 230 deletions(-)

diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index 0258f9d..1f61d58 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -83,12 +83,6 @@ class CancelJob(Exception):
   """
 
 
-class QueueShutdown(Exception):
-  """Special exception to abort a job when the job queue is shutting down.
-
-  """
-
-
 def TimeStampNow():
   """Returns the current timestamp.
 
@@ -612,11 +606,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
       logging.debug("Canceling opcode")
       raise CancelJob()
 
-    # See if queue is shutting down
-    if not self._queue.AcceptingJobsUnlocked():
-      logging.debug("Queue is shutting down")
-      raise QueueShutdown()
-
   @locking.ssynchronized(_QUEUE, shared=1)
   def NotifyStart(self):
     """Mark the opcode as running, not lock-waiting.
@@ -1168,10 +1157,6 @@ class _JobProcessor(object):
       if op.status == constants.OP_STATUS_CANCELING:
         return (constants.OP_STATUS_CANCELING, None)
 
-      # Queue is shutting down, return to queued
-      if not self.queue.AcceptingJobsUnlocked():
-        return (constants.OP_STATUS_QUEUED, None)
-
       # Stay in waitlock while trying to re-acquire lock
       return (constants.OP_STATUS_WAITING, None)
     except CancelJob:
@@ -1179,14 +1164,6 @@ class _JobProcessor(object):
       assert op.status == constants.OP_STATUS_CANCELING
       return (constants.OP_STATUS_CANCELING, None)
 
-    except QueueShutdown:
-      logging.exception("%s: Queue is shutting down", opctx.log_prefix)
-
-      assert op.status == constants.OP_STATUS_WAITING
-
-      # Job hadn't been started yet, so it should return to the queue
-      return (constants.OP_STATUS_QUEUED, None)
-
     except Exception, err: # pylint: disable=W0703
       logging.exception("%s: Caught exception in %s",
                         opctx.log_prefix, opctx.summary)
@@ -1665,9 +1642,6 @@ class JobQueue(object):
     self.acquire = self._lock.acquire
     self.release = self._lock.release
 
-    # Accept jobs by default
-    self._accepting_jobs = True
-
     # Read serial file
     self._last_serial = jstore.ReadSerial()
     assert self._last_serial is not None, ("Serial file was modified between"
@@ -2512,22 +2486,8 @@ class JobQueue(object):
     @return: Whether there are any running jobs
 
     """
-    if self._accepting_jobs:
-      self._accepting_jobs = False
-
     return self._wpool.HasRunningTasks()
 
-  def AcceptingJobsUnlocked(self):
-    """Returns whether jobs are accepted.
-
-    Once L{PrepareShutdown} has been called, no new jobs are accepted and the
-    queue is shutting down.
-
-    @rtype: bool
-
-    """
-    return self._accepting_jobs
-
   @locking.ssynchronized(_LOCK)
   def Shutdown(self):
     """Stops the job queue.
diff --git a/test/py/ganeti.jqueue_unittest.py 
b/test/py/ganeti.jqueue_unittest.py
index 1aa3ec9..7ff14a0 100755
--- a/test/py/ganeti.jqueue_unittest.py
+++ b/test/py/ganeti.jqueue_unittest.py
@@ -715,7 +715,6 @@ class _FakeQueueForProc:
     self._acquired = False
     self._updates = []
     self._submitted = []
-    self._accepting_jobs = True
 
     self._submit_count = itertools.count(1000)
 
@@ -751,12 +750,6 @@ class _FakeQueueForProc:
     self._submitted.extend(zip(job_ids, jobs))
     return job_ids
 
-  def StopAcceptingJobs(self):
-    self._accepting_jobs = False
-
-  def AcceptingJobsUnlocked(self):
-    return self._accepting_jobs
-
 
 class _FakeExecOpCodeForProc:
   def __init__(self, queue, before_start, after_start):
@@ -1154,10 +1147,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
 
     self._TestCancelWhileSomething(fn)
 
-  def testCancelDuringQueueShutdown(self):
-    queue = self._TestCancelWhileSomething(lambda q: q.StopAcceptingJobs())
-    self.assertFalse(queue.AcceptingJobsUnlocked())
-
   def testCancelWhileRunning(self):
     # Tests canceling a job with finished opcodes and more, unprocessed ones
     queue = _FakeQueueForProc()
@@ -1204,185 +1193,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
                       ["Res0", "Job canceled by request",
                        "Job canceled by request"]])
 
-  def _TestQueueShutdown(self, queue, opexec, job, runcount):
-    self.assertTrue(queue.AcceptingJobsUnlocked())
-
-    # Simulate shutdown
-    queue.StopAcceptingJobs()
-
-    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
-                     jqueue._JobProcessor.DEFER)
-
-    # Check result
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
-    self.assertEqual(job.GetInfo(["status"]), [constants.JOB_STATUS_QUEUED])
-    self.assertFalse(job.cur_opctx)
-    self.assertTrue(job.start_timestamp)
-    self.assertFalse(job.end_timestamp)
-    self.assertEqual(job.start_timestamp, job.ops[0].start_timestamp)
-    self.assertTrue(compat.all(op.start_timestamp and op.end_timestamp
-                               for op in job.ops[:runcount]))
-    self.assertFalse(job.ops[runcount].end_timestamp)
-    self.assertFalse(compat.any(op.start_timestamp or op.end_timestamp
-                                for op in job.ops[(runcount + 1):]))
-    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
-                     [(([constants.OP_STATUS_SUCCESS] * runcount) +
-                       ([constants.OP_STATUS_QUEUED] *
-                        (len(job.ops) - runcount))),
-                      (["Res%s" % i for i in range(runcount)] +
-                       ([None] * (len(job.ops) - runcount)))])
-
-    # Must have been written and replicated
-    self.assertEqual(queue.GetNextUpdate(), (job, True))
-    self.assertRaises(IndexError, queue.GetNextUpdate)
-
-  def testQueueShutdownWhileRunning(self):
-    # Tests shutting down the queue while a job is running
-    queue = _FakeQueueForProc()
-
-    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
-           for i in range(3)]
-
-    # Create job
-    job_id = 2718211587
-    job = self._CreateJob(queue, job_id, ops)
-
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
-
-    opexec = _FakeExecOpCodeForProc(queue, None, None)
-
-    self.assertRaises(IndexError, queue.GetNextUpdate)
-
-    # Run one opcode
-    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
-                     jqueue._JobProcessor.DEFER)
-
-    # Job goes back to queued
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
-    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
-                     [[constants.OP_STATUS_SUCCESS,
-                       constants.OP_STATUS_QUEUED,
-                       constants.OP_STATUS_QUEUED],
-                      ["Res0", None, None]])
-    self.assertFalse(job.cur_opctx)
-
-    # Writes for waiting, running and result
-    for _ in range(3):
-      self.assertEqual(queue.GetNextUpdate(), (job, True))
-
-    # Run second opcode
-    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
-                     jqueue._JobProcessor.DEFER)
-
-    # Job goes back to queued
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
-    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
-                     [[constants.OP_STATUS_SUCCESS,
-                       constants.OP_STATUS_SUCCESS,
-                       constants.OP_STATUS_QUEUED],
-                      ["Res0", "Res1", None]])
-    self.assertFalse(job.cur_opctx)
-
-    # Writes for waiting, running and result
-    for _ in range(3):
-      self.assertEqual(queue.GetNextUpdate(), (job, True))
-
-    self._TestQueueShutdown(queue, opexec, job, 2)
-
-  def testQueueShutdownWithLockTimeout(self):
-    # Tests shutting down while a lock acquire times out
-    queue = _FakeQueueForProc()
-
-    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
-           for i in range(3)]
-
-    # Create job
-    job_id = 1304231178
-    job = self._CreateJob(queue, job_id, ops)
-
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
-
-    acquire_timeout = False
-
-    def _BeforeStart(timeout, priority):
-      self.assertFalse(queue.IsAcquired())
-      self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
-      if acquire_timeout:
-        raise mcpu.LockAcquireTimeout()
-
-    opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, None)
-
-    self.assertRaises(IndexError, queue.GetNextUpdate)
-
-    # Run one opcode
-    self.assertEqual(jqueue._JobProcessor(queue, opexec, job)(),
-                     jqueue._JobProcessor.DEFER)
-
-    # Job goes back to queued
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
-    self.assertEqual(job.GetInfo(["opstatus", "opresult"]),
-                     [[constants.OP_STATUS_SUCCESS,
-                       constants.OP_STATUS_QUEUED,
-                       constants.OP_STATUS_QUEUED],
-                      ["Res0", None, None]])
-    self.assertFalse(job.cur_opctx)
-
-    # Writes for waiting, running and result
-    for _ in range(3):
-      self.assertEqual(queue.GetNextUpdate(), (job, True))
-
-    # The next opcode should have expiring lock acquires
-    acquire_timeout = True
-
-    self._TestQueueShutdown(queue, opexec, job, 1)
-
-  def testQueueShutdownWhileInQueue(self):
-    # This should never happen in reality (no new jobs are started by the
-    # workerpool once a shutdown has been initiated), but it's better to test
-    # the job processor for this scenario
-    queue = _FakeQueueForProc()
-
-    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
-           for i in range(5)]
-
-    # Create job
-    job_id = 2031
-    job = self._CreateJob(queue, job_id, ops)
-
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
-    self.assertRaises(IndexError, queue.GetNextUpdate)
-
-    self.assertFalse(job.start_timestamp)
-    self.assertFalse(job.end_timestamp)
-    self.assertTrue(compat.all(op.status == constants.OP_STATUS_QUEUED
-                               for op in job.ops))
-
-    opexec = _FakeExecOpCodeForProc(queue, None, None)
-    self._TestQueueShutdown(queue, opexec, job, 0)
-
-  def testQueueShutdownWhileWaitlockInQueue(self):
-    queue = _FakeQueueForProc()
-
-    ops = [opcodes.OpTestDummy(result="Res%s" % i, fail=False)
-           for i in range(5)]
-
-    # Create job
-    job_id = 53125685
-    job = self._CreateJob(queue, job_id, ops)
-
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
-
-    job.ops[0].status = constants.OP_STATUS_WAITING
-
-    assert len(job.ops) == 5
-
-    self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
-
-    self.assertRaises(IndexError, queue.GetNextUpdate)
-
-    opexec = _FakeExecOpCodeForProc(queue, None, None)
-    self._TestQueueShutdown(queue, opexec, job, 0)
-
   def testPartiallyRun(self):
     # Tests calling the processor on a job that's been partially run before the
     # program was restarted
-- 
2.0.0.526.g5318336

Reply via email to