Given that now each process only runs a single job, there is no need
anymore to keep track about the worker pool being active. Hence drop
the corresponding code.

The only active use of the SetActive functionality is in the tests;
these tests, however, test priority handling in the worker pool,
which is not that important for single-worker worker pools anyway.
So we just remove the tests.

Signed-off-by: Klaus Aehlig <[email protected]>
---
 lib/jqueue/__init__.py                |  11 +-
 lib/workerpool.py                     |  25 +---
 test/py/ganeti.workerpool_unittest.py | 250 ----------------------------------
 3 files changed, 4 insertions(+), 282 deletions(-)

diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index 89a745c..0258f9d 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -2504,11 +2504,9 @@ class JobQueue(object):
   def PrepareShutdown(self):
     """Prepare to stop the job queue.
 
-    Disables execution of jobs in the workerpool and returns whether there are
-    any jobs currently running. If the latter is the case, the job queue is not
-    yet ready for shutdown. Once this function returns C{True} L{Shutdown} can
-    be called without interfering with any job. Queued and unfinished jobs will
-    be resumed next time.
+    Returns whether there are any jobs currently running. If the latter is the
+    case, the job queue is not yet ready for shutdown. Once this function
+    returns C{True} L{Shutdown} can be called without interfering with any job.
 
     @rtype: bool
     @return: Whether there are any running jobs
@@ -2517,9 +2515,6 @@ class JobQueue(object):
     if self._accepting_jobs:
       self._accepting_jobs = False
 
-      # Tell worker pool to stop processing pending tasks
-      self._wpool.SetActive(False)
-
     return self._wpool.HasRunningTasks()
 
   def AcceptingJobsUnlocked(self):
diff --git a/lib/workerpool.py b/lib/workerpool.py
index 6b558ce..734530a 100644
--- a/lib/workerpool.py
+++ b/lib/workerpool.py
@@ -281,7 +281,6 @@ class WorkerPool(object):
     self._last_worker_id = 0
     self._workers = []
     self._quiescing = False
-    self._active = True
 
     # Terminating workers
     self._termworkers = []
@@ -447,28 +446,6 @@ class WorkerPool(object):
     finally:
       self._lock.release()
 
-  def SetActive(self, active):
-    """Enable/disable processing of tasks.
-
-    This is different from L{Quiesce} in the sense that this function just
-    changes an internal flag and doesn't wait for the queue to be empty. Tasks
-    already being processed continue normally, but no new tasks will be
-    started. New tasks can still be added.
-
-    @type active: bool
-    @param active: Whether tasks should be processed
-
-    """
-    self._lock.acquire()
-    try:
-      self._active = active
-
-      if active:
-        # Tell all workers to continue processing
-        self._pool_to_worker.notifyAll()
-    finally:
-      self._lock.release()
-
   def _WaitForTaskUnlocked(self, worker):
     """Waits for a task for a worker.
 
@@ -481,7 +458,7 @@ class WorkerPool(object):
         return _TERMINATE
 
       # If there's a pending task, return it immediately
-      if self._active and self._tasks:
+      if self._tasks:
         # Get task from queue and tell pool about it
         try:
           task = heapq.heappop(self._tasks)
diff --git a/test/py/ganeti.workerpool_unittest.py 
b/test/py/ganeti.workerpool_unittest.py
index 8f35a69..a2cc476 100755
--- a/test/py/ganeti.workerpool_unittest.py
+++ b/test/py/ganeti.workerpool_unittest.py
@@ -196,53 +196,6 @@ class TestWorkerpool(unittest.TestCase):
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
-  def testActive(self):
-    ctx = CountingContext()
-    wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
-    try:
-      self._CheckWorkerCount(wp, 5)
-      self.assertTrue(wp._active)
-
-      # Process some tasks
-      for _ in range(10):
-        wp.AddTask((ctx, None))
-
-      wp.Quiesce()
-      self._CheckNoTasks(wp)
-      self.assertEquals(ctx.GetDoneTasks(), 10)
-
-      # Repeat a few times
-      for count in range(10):
-        # Deactivate pool
-        wp.SetActive(False)
-        self._CheckNoTasks(wp)
-
-        # Queue some more tasks
-        for _ in range(10):
-          wp.AddTask((ctx, None))
-
-        for _ in range(5):
-          # Short delays to give other threads a chance to cause breakage
-          time.sleep(.01)
-          wp.AddTask((ctx, "Hello world %s" % 999))
-          self.assertFalse(wp._active)
-
-        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
-
-        # Start processing again
-        wp.SetActive(True)
-        self.assertTrue(wp._active)
-
-        # Wait for tasks to finish
-        wp.Quiesce()
-        self._CheckNoTasks(wp)
-        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
-
-        self._CheckWorkerCount(wp, 5)
-    finally:
-      wp.TerminateWorkers()
-      self._CheckWorkerCount(wp, 0)
-
   def testChecksum(self):
     # Tests whether all tasks are run and, since we're only using a single
     # thread, whether everything is started in order.
@@ -523,209 +476,6 @@ class TestWorkerpool(unittest.TestCase):
       wp.TerminateWorkers()
       self._CheckWorkerCount(wp, 0)
 
-  def testChangeTaskPriority(self):
-    wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
-    try:
-      self._CheckWorkerCount(wp, 1)
-
-      ctx = PriorityContext()
-
-      # Use static seed for this test
-      rnd = random.Random(4727)
-
-      # Disable processing of tasks
-      wp.SetActive(False)
-
-      # No task ID
-      self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
-                        None, 0)
-
-      # Pre-generate task IDs and priorities
-      count = 100
-      task_ids = range(0, count)
-      priorities = range(200, 200 + count) * 2
-
-      rnd.shuffle(task_ids)
-      rnd.shuffle(priorities)
-
-      # Make sure there are some duplicate priorities, but not all
-      priorities[count * 2 - 10:count * 2 - 1] = \
-        priorities[count - 10: count - 1]
-
-      assert len(priorities) == 2 * count
-      assert priorities[0:(count - 1)] != priorities[count:(2 * count - 1)]
-
-      # Add some tasks; this loop consumes the first half of all previously
-      # generated priorities
-      for (idx, task_id) in enumerate(task_ids):
-        wp.AddTask((ctx, idx),
-                   priority=priorities.pop(),
-                   task_id=task_id)
-
-      self.assertEqual(len(wp._tasks), len(task_ids))
-      self.assertEqual(len(wp._taskdata), len(task_ids))
-
-      # Tasks have been added, so half of the priorities should have been
-      # consumed
-      assert len(priorities) == len(task_ids)
-
-      # Change task priority
-      expected = []
-      for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
-        wp.ChangeTaskPriority(task_id, prio)
-        expected.append((prio, idx))
-
-      self.assertEqual(len(wp._taskdata), len(task_ids))
-
-      # Half the entries are now abandoned tasks
-      self.assertEqual(len(wp._tasks), len(task_ids) * 2)
-
-      assert len(priorities) == count
-      assert len(task_ids) == count
-
-      # Start processing
-      wp.SetActive(True)
-
-      # Wait for tasks to finish
-      wp.Quiesce()
-
-      self._CheckNoTasks(wp)
-
-      for task_id in task_ids:
-        # All tasks are done
-        self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
-                          task_id, 0)
-
-      # Check result
-      ctx.lock.acquire()
-      try:
-        self.assertEqual(ctx.result, sorted(expected))
-      finally:
-        ctx.lock.release()
-
-      self._CheckWorkerCount(wp, 1)
-    finally:
-      wp.TerminateWorkers()
-      self._CheckWorkerCount(wp, 0)
-
-  def testChangeTaskPriorityInteralStructures(self):
-    wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
-    try:
-      self._CheckWorkerCount(wp, 1)
-
-      # Use static seed for this test
-      rnd = random.Random(643)
-
-      (num1, num2) = rnd.sample(range(1000), 2)
-
-      # Disable processing of tasks
-      wp.SetActive(False)
-
-      self.assertFalse(wp._tasks)
-      self.assertFalse(wp._taskdata)
-
-      # No priority or task ID
-      wp.AddTask(())
-      self.assertEqual(wp._tasks, [
-        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
-        ])
-      self.assertFalse(wp._taskdata)
-
-      # No task ID
-      wp.AddTask((), priority=7413)
-      self.assertEqual(wp._tasks, [
-        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
-        [7413, 1, None, ()],
-        ])
-      self.assertFalse(wp._taskdata)
-
-      # Start adding real tasks
-      wp.AddTask((), priority=10267659, task_id=num1)
-      self.assertEqual(wp._tasks, [
-        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
-        [7413, 1, None, ()],
-        [10267659, 2, num1, ()],
-        ])
-      self.assertEqual(wp._taskdata, {
-        num1: [10267659, 2, num1, ()],
-        })
-
-      wp.AddTask((), priority=123, task_id=num2)
-      self.assertEqual(sorted(wp._tasks), [
-        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
-        [123, 3, num2, ()],
-        [7413, 1, None, ()],
-        [10267659, 2, num1, ()],
-        ])
-      self.assertEqual(wp._taskdata, {
-        num1: [10267659, 2, num1, ()],
-        num2: [123, 3, num2, ()],
-        })
-
-      wp.ChangeTaskPriority(num1, 100)
-      self.assertEqual(sorted(wp._tasks), [
-        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
-        [100, 2, num1, ()],
-        [123, 3, num2, ()],
-        [7413, 1, None, ()],
-        [10267659, 2, num1, None],
-        ])
-      self.assertEqual(wp._taskdata, {
-        num1: [100, 2, num1, ()],
-        num2: [123, 3, num2, ()],
-        })
-
-      wp.ChangeTaskPriority(num2, 91337)
-      self.assertEqual(sorted(wp._tasks), [
-        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
-        [100, 2, num1, ()],
-        [123, 3, num2, None],
-        [7413, 1, None, ()],
-        [91337, 3, num2, ()],
-        [10267659, 2, num1, None],
-        ])
-      self.assertEqual(wp._taskdata, {
-        num1: [100, 2, num1, ()],
-        num2: [91337, 3, num2, ()],
-        })
-
-      wp.ChangeTaskPriority(num1, 10139)
-      self.assertEqual(sorted(wp._tasks), [
-        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
-        [100, 2, num1, None],
-        [123, 3, num2, None],
-        [7413, 1, None, ()],
-        [10139, 2, num1, ()],
-        [91337, 3, num2, ()],
-        [10267659, 2, num1, None],
-        ])
-      self.assertEqual(wp._taskdata, {
-        num1: [10139, 2, num1, ()],
-        num2: [91337, 3, num2, ()],
-        })
-
-      # Change to the same priority once again
-      wp.ChangeTaskPriority(num1, 10139)
-      self.assertEqual(sorted(wp._tasks), [
-        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
-        [100, 2, num1, None],
-        [123, 3, num2, None],
-        [7413, 1, None, ()],
-        [10139, 2, num1, None],
-        [10139, 2, num1, ()],
-        [91337, 3, num2, ()],
-        [10267659, 2, num1, None],
-        ])
-      self.assertEqual(wp._taskdata, {
-        num1: [10139, 2, num1, ()],
-        num2: [91337, 3, num2, ()],
-        })
-
-      self._CheckWorkerCount(wp, 1)
-    finally:
-      wp.TerminateWorkers()
-      self._CheckWorkerCount(wp, 0)
-
 
 if __name__ == "__main__":
   testutils.GanetiTestProgram()
-- 
2.0.0.526.g5318336

Reply via email to