LGTM

On Wed, Jun 11, 2014 at 6:05 PM, 'Klaus Aehlig' via ganeti-devel <
[email protected]> wrote:

> Given that now each process only runs a single job, there is no need
> anymore to keep track about the worker pool being active. Hence drop
> the corresponding code.
>
> The only active use of the SetActive functionality is in the tests;
> these tests, however, test priority handling in the worker pool,
> which is not that important for single-worker worker pools anyway.
> So we just remove the tests.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  lib/jqueue/__init__.py                |  11 +-
>  lib/workerpool.py                     |  25 +---
>  test/py/ganeti.workerpool_unittest.py | 250
> ----------------------------------
>  3 files changed, 4 insertions(+), 282 deletions(-)
>
> diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
> index 89a745c..0258f9d 100644
> --- a/lib/jqueue/__init__.py
> +++ b/lib/jqueue/__init__.py
> @@ -2504,11 +2504,9 @@ class JobQueue(object):
>    def PrepareShutdown(self):
>      """Prepare to stop the job queue.
>
> -    Disables execution of jobs in the workerpool and returns whether
> there are
> -    any jobs currently running. If the latter is the case, the job queue
> is not
> -    yet ready for shutdown. Once this function returns C{True}
> L{Shutdown} can
> -    be called without interfering with any job. Queued and unfinished
> jobs will
> -    be resumed next time.
> +    Returns whether there are any jobs currently running. If the latter
> is the
> +    case, the job queue is not yet ready for shutdown. Once this function
> +    returns C{True} L{Shutdown} can be called without interfering with
> any job.
>
>      @rtype: bool
>      @return: Whether there are any running jobs
> @@ -2517,9 +2515,6 @@ class JobQueue(object):
>      if self._accepting_jobs:
>        self._accepting_jobs = False
>
> -      # Tell worker pool to stop processing pending tasks
> -      self._wpool.SetActive(False)
> -
>      return self._wpool.HasRunningTasks()
>
>    def AcceptingJobsUnlocked(self):
> diff --git a/lib/workerpool.py b/lib/workerpool.py
> index 6b558ce..734530a 100644
> --- a/lib/workerpool.py
> +++ b/lib/workerpool.py
> @@ -281,7 +281,6 @@ class WorkerPool(object):
>      self._last_worker_id = 0
>      self._workers = []
>      self._quiescing = False
> -    self._active = True
>
>      # Terminating workers
>      self._termworkers = []
> @@ -447,28 +446,6 @@ class WorkerPool(object):
>      finally:
>        self._lock.release()
>
> -  def SetActive(self, active):
> -    """Enable/disable processing of tasks.
> -
> -    This is different from L{Quiesce} in the sense that this function just
> -    changes an internal flag and doesn't wait for the queue to be empty.
> Tasks
> -    already being processed continue normally, but no new tasks will be
> -    started. New tasks can still be added.
> -
> -    @type active: bool
> -    @param active: Whether tasks should be processed
> -
> -    """
> -    self._lock.acquire()
> -    try:
> -      self._active = active
> -
> -      if active:
> -        # Tell all workers to continue processing
> -        self._pool_to_worker.notifyAll()
> -    finally:
> -      self._lock.release()
> -
>    def _WaitForTaskUnlocked(self, worker):
>      """Waits for a task for a worker.
>
> @@ -481,7 +458,7 @@ class WorkerPool(object):
>          return _TERMINATE
>
>        # If there's a pending task, return it immediately
> -      if self._active and self._tasks:
> +      if self._tasks:
>          # Get task from queue and tell pool about it
>          try:
>            task = heapq.heappop(self._tasks)
> diff --git a/test/py/ganeti.workerpool_unittest.py b/test/py/
> ganeti.workerpool_unittest.py
> index 8f35a69..a2cc476 100755
> --- a/test/py/ganeti.workerpool_unittest.py
> +++ b/test/py/ganeti.workerpool_unittest.py
> @@ -196,53 +196,6 @@ class TestWorkerpool(unittest.TestCase):
>        wp.TerminateWorkers()
>        self._CheckWorkerCount(wp, 0)
>
> -  def testActive(self):
> -    ctx = CountingContext()
> -    wp = workerpool.WorkerPool("TestActive", 5, CountingBaseWorker)
> -    try:
> -      self._CheckWorkerCount(wp, 5)
> -      self.assertTrue(wp._active)
> -
> -      # Process some tasks
> -      for _ in range(10):
> -        wp.AddTask((ctx, None))
> -
> -      wp.Quiesce()
> -      self._CheckNoTasks(wp)
> -      self.assertEquals(ctx.GetDoneTasks(), 10)
> -
> -      # Repeat a few times
> -      for count in range(10):
> -        # Deactivate pool
> -        wp.SetActive(False)
> -        self._CheckNoTasks(wp)
> -
> -        # Queue some more tasks
> -        for _ in range(10):
> -          wp.AddTask((ctx, None))
> -
> -        for _ in range(5):
> -          # Short delays to give other threads a chance to cause breakage
> -          time.sleep(.01)
> -          wp.AddTask((ctx, "Hello world %s" % 999))
> -          self.assertFalse(wp._active)
> -
> -        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15))
> -
> -        # Start processing again
> -        wp.SetActive(True)
> -        self.assertTrue(wp._active)
> -
> -        # Wait for tasks to finish
> -        wp.Quiesce()
> -        self._CheckNoTasks(wp)
> -        self.assertEquals(ctx.GetDoneTasks(), 10 + (count * 15) + 15)
> -
> -        self._CheckWorkerCount(wp, 5)
> -    finally:
> -      wp.TerminateWorkers()
> -      self._CheckWorkerCount(wp, 0)
> -
>    def testChecksum(self):
>      # Tests whether all tasks are run and, since we're only using a single
>      # thread, whether everything is started in order.
> @@ -523,209 +476,6 @@ class TestWorkerpool(unittest.TestCase):
>        wp.TerminateWorkers()
>        self._CheckWorkerCount(wp, 0)
>
> -  def testChangeTaskPriority(self):
> -    wp = workerpool.WorkerPool("Test", 1, PriorityWorker)
> -    try:
> -      self._CheckWorkerCount(wp, 1)
> -
> -      ctx = PriorityContext()
> -
> -      # Use static seed for this test
> -      rnd = random.Random(4727)
> -
> -      # Disable processing of tasks
> -      wp.SetActive(False)
> -
> -      # No task ID
> -      self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
> -                        None, 0)
> -
> -      # Pre-generate task IDs and priorities
> -      count = 100
> -      task_ids = range(0, count)
> -      priorities = range(200, 200 + count) * 2
> -
> -      rnd.shuffle(task_ids)
> -      rnd.shuffle(priorities)
> -
> -      # Make sure there are some duplicate priorities, but not all
> -      priorities[count * 2 - 10:count * 2 - 1] = \
> -        priorities[count - 10: count - 1]
> -
> -      assert len(priorities) == 2 * count
> -      assert priorities[0:(count - 1)] != priorities[count:(2 * count -
> 1)]
> -
> -      # Add some tasks; this loop consumes the first half of all
> previously
> -      # generated priorities
> -      for (idx, task_id) in enumerate(task_ids):
> -        wp.AddTask((ctx, idx),
> -                   priority=priorities.pop(),
> -                   task_id=task_id)
> -
> -      self.assertEqual(len(wp._tasks), len(task_ids))
> -      self.assertEqual(len(wp._taskdata), len(task_ids))
> -
> -      # Tasks have been added, so half of the priorities should have been
> -      # consumed
> -      assert len(priorities) == len(task_ids)
> -
> -      # Change task priority
> -      expected = []
> -      for ((idx, task_id), prio) in zip(enumerate(task_ids), priorities):
> -        wp.ChangeTaskPriority(task_id, prio)
> -        expected.append((prio, idx))
> -
> -      self.assertEqual(len(wp._taskdata), len(task_ids))
> -
> -      # Half the entries are now abandoned tasks
> -      self.assertEqual(len(wp._tasks), len(task_ids) * 2)
> -
> -      assert len(priorities) == count
> -      assert len(task_ids) == count
> -
> -      # Start processing
> -      wp.SetActive(True)
> -
> -      # Wait for tasks to finish
> -      wp.Quiesce()
> -
> -      self._CheckNoTasks(wp)
> -
> -      for task_id in task_ids:
> -        # All tasks are done
> -        self.assertRaises(workerpool.NoSuchTask, wp.ChangeTaskPriority,
> -                          task_id, 0)
> -
> -      # Check result
> -      ctx.lock.acquire()
> -      try:
> -        self.assertEqual(ctx.result, sorted(expected))
> -      finally:
> -        ctx.lock.release()
> -
> -      self._CheckWorkerCount(wp, 1)
> -    finally:
> -      wp.TerminateWorkers()
> -      self._CheckWorkerCount(wp, 0)
> -
> -  def testChangeTaskPriorityInteralStructures(self):
> -    wp = workerpool.WorkerPool("Test", 1, NotImplementedWorker)
> -    try:
> -      self._CheckWorkerCount(wp, 1)
> -
> -      # Use static seed for this test
> -      rnd = random.Random(643)
> -
> -      (num1, num2) = rnd.sample(range(1000), 2)
> -
> -      # Disable processing of tasks
> -      wp.SetActive(False)
> -
> -      self.assertFalse(wp._tasks)
> -      self.assertFalse(wp._taskdata)
> -
> -      # No priority or task ID
> -      wp.AddTask(())
> -      self.assertEqual(wp._tasks, [
> -        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
> -        ])
> -      self.assertFalse(wp._taskdata)
> -
> -      # No task ID
> -      wp.AddTask((), priority=7413)
> -      self.assertEqual(wp._tasks, [
> -        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
> -        [7413, 1, None, ()],
> -        ])
> -      self.assertFalse(wp._taskdata)
> -
> -      # Start adding real tasks
> -      wp.AddTask((), priority=10267659, task_id=num1)
> -      self.assertEqual(wp._tasks, [
> -        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
> -        [7413, 1, None, ()],
> -        [10267659, 2, num1, ()],
> -        ])
> -      self.assertEqual(wp._taskdata, {
> -        num1: [10267659, 2, num1, ()],
> -        })
> -
> -      wp.AddTask((), priority=123, task_id=num2)
> -      self.assertEqual(sorted(wp._tasks), [
> -        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
> -        [123, 3, num2, ()],
> -        [7413, 1, None, ()],
> -        [10267659, 2, num1, ()],
> -        ])
> -      self.assertEqual(wp._taskdata, {
> -        num1: [10267659, 2, num1, ()],
> -        num2: [123, 3, num2, ()],
> -        })
> -
> -      wp.ChangeTaskPriority(num1, 100)
> -      self.assertEqual(sorted(wp._tasks), [
> -        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
> -        [100, 2, num1, ()],
> -        [123, 3, num2, ()],
> -        [7413, 1, None, ()],
> -        [10267659, 2, num1, None],
> -        ])
> -      self.assertEqual(wp._taskdata, {
> -        num1: [100, 2, num1, ()],
> -        num2: [123, 3, num2, ()],
> -        })
> -
> -      wp.ChangeTaskPriority(num2, 91337)
> -      self.assertEqual(sorted(wp._tasks), [
> -        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
> -        [100, 2, num1, ()],
> -        [123, 3, num2, None],
> -        [7413, 1, None, ()],
> -        [91337, 3, num2, ()],
> -        [10267659, 2, num1, None],
> -        ])
> -      self.assertEqual(wp._taskdata, {
> -        num1: [100, 2, num1, ()],
> -        num2: [91337, 3, num2, ()],
> -        })
> -
> -      wp.ChangeTaskPriority(num1, 10139)
> -      self.assertEqual(sorted(wp._tasks), [
> -        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
> -        [100, 2, num1, None],
> -        [123, 3, num2, None],
> -        [7413, 1, None, ()],
> -        [10139, 2, num1, ()],
> -        [91337, 3, num2, ()],
> -        [10267659, 2, num1, None],
> -        ])
> -      self.assertEqual(wp._taskdata, {
> -        num1: [10139, 2, num1, ()],
> -        num2: [91337, 3, num2, ()],
> -        })
> -
> -      # Change to the same priority once again
> -      wp.ChangeTaskPriority(num1, 10139)
> -      self.assertEqual(sorted(wp._tasks), [
> -        [workerpool._DEFAULT_PRIORITY, 0, None, ()],
> -        [100, 2, num1, None],
> -        [123, 3, num2, None],
> -        [7413, 1, None, ()],
> -        [10139, 2, num1, None],
> -        [10139, 2, num1, ()],
> -        [91337, 3, num2, ()],
> -        [10267659, 2, num1, None],
> -        ])
> -      self.assertEqual(wp._taskdata, {
> -        num1: [10139, 2, num1, ()],
> -        num2: [91337, 3, num2, ()],
> -        })
> -
> -      self._CheckWorkerCount(wp, 1)
> -    finally:
> -      wp.TerminateWorkers()
> -      self._CheckWorkerCount(wp, 0)
> -
>
>  if __name__ == "__main__":
>    testutils.GanetiTestProgram()
> --
> 2.0.0.526.g5318336
>
>

Reply via email to