On Thu, May 21, 2015 at 04:00:04PM +0200, 'Klaus Aehlig' via ganeti-devel wrote:
It is no longer needed, now that the only job is handled
in the main thread. The only purpose of the job queue now
is to pass contextual information around.

Signed-off-by: Klaus Aehlig <[email protected]>
---
lib/jqueue/__init__.py            | 248 +-------------------------------------
test/py/ganeti.jqueue_unittest.py | 139 +--------------------
2 files changed, 4 insertions(+), 383 deletions(-)

diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index cc3e4f3..2fdad91 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -33,9 +33,6 @@
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.

-@var JOBQUEUE_THREADS: the number of worker threads we start for
-    processing jobs
-
"""

import logging
@@ -56,7 +53,6 @@ except ImportError:
from ganeti import asyncnotifier
from ganeti import constants
from ganeti import serializer
-from ganeti import workerpool
from ganeti import locking
from ganeti import luxi
from ganeti import opcodes
@@ -77,8 +73,6 @@ from ganeti import vcluster
from ganeti.cmdlib import cluster


-JOBQUEUE_THREADS = 1
-
# member lock names to be passed to @ssynchronized decorator
_LOCK = "_lock"
_QUEUE = "_queue"
@@ -1147,114 +1141,6 @@ class _JobProcessor(object):
      queue.release()


-def _EvaluateJobProcessorResult(depmgr, job, result):
-  """Looks at a result from L{_JobProcessor} for a job.
-
-  To be used in a L{_JobQueueWorker}.
-
-  """
-  if result == _JobProcessor.FINISHED:
-    # Notify waiting jobs
-    depmgr.NotifyWaiters(job.id)
-
-  elif result == _JobProcessor.DEFER:
-    # Schedule again
-    raise workerpool.DeferTask(priority=job.CalcPriority())
-
-  elif result == _JobProcessor.WAITDEP:
-    # No-op, dependency manager will re-schedule
-    pass
-
-  else:
-    raise errors.ProgrammerError("Job processor returned unknown status %s" %
-                                 (result, ))
-
-
-class _JobQueueWorker(workerpool.BaseWorker):
-  """The actual job workers.
-
-  """
-  def RunTask(self, job): # pylint: disable=W0221
-    """Job executor.
-
-    @type job: L{_QueuedJob}
-    @param job: the job to be processed
-
-    """
-    assert job.writable, "Expected writable job"
-
-    # Ensure only one worker is active on a single job. If a job registers for
-    # a dependency job, and the other job notifies before the first worker is
-    # done, the job can end up in the tasklist more than once.
-    job.processor_lock.acquire()
-    try:
-      return self._RunTaskInner(job)
-    finally:
-      job.processor_lock.release()
-
-  def _RunTaskInner(self, job):
-    """Executes a job.
-
-    Must be called with per-job lock acquired.
-
-    """
-    queue = job.queue
-    assert queue == self.pool.queue
-
-    setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
-    setname_fn(None)
-
-    proc = mcpu.Processor(queue.context, job.id)
-
-    # Create wrapper for setting thread name
-    wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
-                                    proc.ExecOpCode)
-
-    _EvaluateJobProcessorResult(queue.depmgr, job,
-                                _JobProcessor(queue, wrap_execop_fn, job)())
-
-  @staticmethod
-  def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
-    """Updates the worker thread name to include a short summary of the opcode.
-
-    @param setname_fn: Callable setting worker thread name
-    @param execop_fn: Callable for executing opcode (usually
-                      L{mcpu.Processor.ExecOpCode})
-
-    """
-    setname_fn(op)
-    try:
-      return execop_fn(op, *args, **kwargs)
-    finally:
-      setname_fn(None)
-
-  @staticmethod
-  def _GetWorkerName(job, op):
-    """Sets the worker thread name.
-
-    @type job: L{_QueuedJob}
-    @type op: L{opcodes.OpCode}
-
-    """
-    parts = ["Job%s" % job.id]
-
-    if op:
-      parts.append(op.TinySummary())
-
-    return "/".join(parts)
-
-
-class _JobQueueWorkerPool(workerpool.WorkerPool):
-  """Simple class implementing a job-processing workerpool.
-
-  """
-  def __init__(self, queue):
-    super(_JobQueueWorkerPool, self).__init__("Jq",
-                                              JOBQUEUE_THREADS,
-                                              _JobQueueWorker)
-    self.queue = queue
-
-
class _JobDependencyManager:
  """Keeps track of job dependencies.

@@ -1265,12 +1151,11 @@ class _JobDependencyManager:
   CONTINUE,
   WRONGSTATUS) = range(1, 6)

-  def __init__(self, getstatus_fn, enqueue_fn):
+  def __init__(self, getstatus_fn):
    """Initializes this class.

    """
    self._getstatus_fn = getstatus_fn
-    self._enqueue_fn = enqueue_fn

    self._waiters = {}
    self._lock = locking.SharedLock("JobDepMgr")
@@ -1365,31 +1250,6 @@ class _JobDependencyManager:
                   if not waiters]:
      del self._waiters[job_id]

-  def NotifyWaiters(self, job_id):
-    """Notifies all jobs waiting for a certain job ID.
-
-    @attention: Do not call until L{CheckAndRegister} returned a status other
-      than C{WAITDEP} for C{job_id}, or behaviour is undefined
-    @type job_id: int
-    @param job_id: Job ID
-
-    """
-    assert ht.TJobId(job_id)
-
-    self._lock.acquire()
-    try:
-      self._RemoveEmptyWaitersUnlocked()
-
-      jobs = self._waiters.pop(job_id, None)
-    finally:
-      self._lock.release()
-
-    if jobs:
-      # Re-add jobs to workerpool
-      logging.debug("Re-adding %s jobs which were waiting for job %s",
-                    len(jobs), job_id)
-      self._enqueue_fn(jobs)
-

class JobQueue(object):
  """Queue used to manage the jobs.
@@ -1438,59 +1298,7 @@ class JobQueue(object):
    assert ht.TInt(self._queue_size)

    # Job dependencies
-    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
-                                        self._EnqueueJobs)
-
-    # Setup worker pool
-    self._wpool = _JobQueueWorkerPool(self)
-
-  def _PickupJobUnlocked(self, job_id):
-    """Load a job from the job queue
-
-    Pick up a job that already is in the job queue and start/resume it.
-
-    """
-    if self.primary_jid:
-      logging.warning("Job process asked to pick up %s, but already has %s",
-                      job_id, self.primary_jid)
-
-    self.primary_jid = int(job_id)
-
-    job = self._LoadJobUnlocked(job_id)
-
-    if job is None:
-      logging.warning("Job %s could not be read", job_id)
-      return
-
-    job.AddReasons(pickup=True)
-
-    status = job.CalcStatus()
-    if status == constants.JOB_STATUS_QUEUED:
-      job.SetPid(os.getpid())
-      self._EnqueueJobsUnlocked([job])
-      logging.info("Restarting job %s", job.id)
-
-    elif status in (constants.JOB_STATUS_RUNNING,
-                    constants.JOB_STATUS_WAITING,
-                    constants.JOB_STATUS_CANCELING):
-      logging.warning("Unfinished job %s found: %s", job.id, job)
-
-      if status == constants.JOB_STATUS_WAITING:
-        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
-        job.SetPid(os.getpid())
-        self._EnqueueJobsUnlocked([job])
-        logging.info("Restarting job %s", job.id)
-      else:
-        to_encode = errors.OpExecError("Unclean master daemon shutdown")
-        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
-                              _EncodeOpError(to_encode))
-        job.Finalize()
-
-    self.UpdateJobUnlocked(job)
-
-  @locking.ssynchronized(_LOCK)
-  def PickupJob(self, job_id):
-    self._PickupJobUnlocked(job_id)
+    self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies)

  def _GetRpc(self, address_list):
    """Gets RPC runner with context.
@@ -1871,28 +1679,6 @@ class JobQueue(object):

    return (True, result)

-  @locking.ssynchronized(_LOCK)
-  def _EnqueueJobs(self, jobs):
-    """Helper function to add jobs to worker pool's queue.
-
-    @type jobs: list
-    @param jobs: List of all jobs
-
-    """
-    return self._EnqueueJobsUnlocked(jobs)
-
-  def _EnqueueJobsUnlocked(self, jobs):
-    """Helper function to add jobs to worker pool's queue.
-
-    @type jobs: list
-    @param jobs: List of all jobs
-
-    """
-    assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
-    self._wpool.AddManyTasks([(job, ) for job in jobs],
-                             priority=[job.CalcPriority() for job in jobs],
-                             task_id=map(_GetIdAttr, jobs))
-
  def _GetJobStatusForDependencies(self, job_id):
    """Gets the status of a job for dependencies.

@@ -1993,13 +1779,6 @@ class JobQueue(object):

    def fn(job):
      (success, msg) = job.ChangePriority(priority)
-
-      if success:
-        try:
-          self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
-        except workerpool.NoSuchTask:
-          logging.debug("Job %s is not in workerpool at this time", job.id)
-
      return (success, msg)

    return self._ModifyJobUnlocked(job_id, fn)
@@ -2030,26 +1809,3 @@ class JobQueue(object):
      self.UpdateJobUnlocked(job)

    return (success, msg)
-
-  @locking.ssynchronized(_LOCK)
-  def PrepareShutdown(self):
-    """Prepare to stop the job queue.
-
-    Returns whether there are any jobs currently running. If the latter is the
-    case, the job queue is not yet ready for shutdown. Once this function
-    returns C{True} L{Shutdown} can be called without interfering with any job.
-
-    @rtype: bool
-    @return: Whether there are any running jobs
-
-    """
-    return self._wpool.HasRunningTasks()
-
-  @locking.ssynchronized(_LOCK)
-  def Shutdown(self):
-    """Stops the job queue.
-
-    This shutdowns all the worker threads an closes the queue.
-
-    """
-    self._wpool.TerminateWorkers()
diff --git a/test/py/ganeti.jqueue_unittest.py 
b/test/py/ganeti.jqueue_unittest.py
index e249057..4080662 100755
--- a/test/py/ganeti.jqueue_unittest.py
+++ b/test/py/ganeti.jqueue_unittest.py
@@ -1531,43 +1531,6 @@ class TestJobProcessor(unittest.TestCase, 
_JobProcessorTestUtils):
    self.assertRaises(IndexError, queue.GetNextUpdate)


-class TestEvaluateJobProcessorResult(unittest.TestCase):
-  def testFinished(self):
-    depmgr = _FakeDependencyManager()
-    job = _IdOnlyFakeJob(30953)
-    jqueue._EvaluateJobProcessorResult(depmgr, job,
-                                       jqueue._JobProcessor.FINISHED)
-    self.assertEqual(depmgr.GetNextNotification(), job.id)
-    self.assertRaises(IndexError, depmgr.GetNextNotification)
-
-  def testDefer(self):
-    depmgr = _FakeDependencyManager()
-    job = _IdOnlyFakeJob(11326, priority=5463)
-    try:
-      jqueue._EvaluateJobProcessorResult(depmgr, job,
-                                         jqueue._JobProcessor.DEFER)
-    except workerpool.DeferTask, err:
-      self.assertEqual(err.priority, 5463)
-    else:
-      self.fail("Didn't raise exception")
-    self.assertRaises(IndexError, depmgr.GetNextNotification)
-
-  def testWaitdep(self):
-    depmgr = _FakeDependencyManager()
-    job = _IdOnlyFakeJob(21317)
-    jqueue._EvaluateJobProcessorResult(depmgr, job,
-                                       jqueue._JobProcessor.WAITDEP)
-    self.assertRaises(IndexError, depmgr.GetNextNotification)
-
-  def testOther(self):
-    depmgr = _FakeDependencyManager()
-    job = _IdOnlyFakeJob(5813)
-    self.assertRaises(errors.ProgrammerError,
-                      jqueue._EvaluateJobProcessorResult,
-                      depmgr, job, "Other result")
-    self.assertRaises(IndexError, depmgr.GetNextNotification)
-
-
class _FakeTimeoutStrategy:
  def __init__(self, timeouts):
    self.timeouts = timeouts
@@ -1882,19 +1845,13 @@ class TestJobDependencyManager(unittest.TestCase):
  def setUp(self):
    self._status = []
    self._queue = []
-    self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
+    self.jdm = jqueue._JobDependencyManager(self._GetStatus)

  def _GetStatus(self, job_id):
    (exp_job_id, result) = self._status.pop(0)
    self.assertEqual(exp_job_id, job_id)
    return result

-  def _Enqueue(self, jobs):
-    self.assertFalse(self.jdm._lock.is_owned(),
-                     msg=("Must not own manager lock while re-adding jobs"
-                          " (potential deadlock)"))
-    self._queue.append(jobs)
-
  def testNotFinalizedThenCancel(self):
    job = _IdOnlyFakeJob(17697)
    job_id = str(28625)
@@ -2019,26 +1976,6 @@ class TestJobDependencyManager(unittest.TestCase):
      self.assertFalse(self.jdm.JobWaiting(job))
      self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))

-  def testNotify(self):
-    job = _IdOnlyFakeJob(8227)
-    job_id = str(4113)
-
-    self._status.append((job_id, constants.JOB_STATUS_RUNNING))
-    (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
-    self.assertEqual(result, self.jdm.WAIT)
-    self.assertFalse(self._status)
-    self.assertFalse(self._queue)
-    self.assertTrue(self.jdm.JobWaiting(job))
-    self.assertEqual(self.jdm._waiters, {
-      job_id: set([job]),
-      })
-
-    self.jdm.NotifyWaiters(job_id)
-    self.assertFalse(self._status)
-    self.assertFalse(self.jdm._waiters)
-    self.assertFalse(self.jdm.JobWaiting(job))
-    self.assertEqual(self._queue, [set([job])])
-
  def testWrongStatus(self):
    job = _IdOnlyFakeJob(10102)
    job_id = str(1271)
@@ -2100,78 +2037,6 @@ class TestJobDependencyManager(unittest.TestCase):
      job_id: set(),
      })

-    # Force cleanup
-    self.jdm.NotifyWaiters("0")
-    self.assertFalse(self.jdm._waiters)
-    self.assertFalse(self._status)
-    self.assertFalse(self._queue)
-
-  def testMultipleWaiting(self):
-    # Use a deterministic random generator
-    rnd = random.Random(21402)
-
-    job_ids = map(str, rnd.sample(range(1, 10000), 150))
-
-    waiters = dict((job_ids.pop(),
-                    set(map(_IdOnlyFakeJob,
-                            [job_ids.pop()
-                             for _ in range(rnd.randint(1, 20))])))
-                   for _ in range(10))
-
-    # Ensure there are no duplicate job IDs
-    assert not utils.FindDuplicates(waiters.keys() +
-                                    [job.id
-                                     for jobs in waiters.values()
-                                     for job in jobs])
-
-    # Register all jobs as waiters
-    for job_id, job in [(job_id, job)
-                        for (job_id, jobs) in waiters.items()
-                        for job in jobs]:
-      self._status.append((job_id, constants.JOB_STATUS_QUEUED))
-      (result, _) = self.jdm.CheckAndRegister(job, job_id,
-                                              [constants.JOB_STATUS_SUCCESS])
-      self.assertEqual(result, self.jdm.WAIT)
-      self.assertFalse(self._status)
-      self.assertFalse(self._queue)
-      self.assertTrue(self.jdm.JobWaiting(job))
-
-    self.assertEqual(self.jdm._waiters, waiters)
-
-    def _MakeSet((name, mode, owner_names, pending)):
-      return (name, mode, owner_names,
-              [(pendmode, set(pend)) for (pendmode, pend) in pending])
-
-    def _CheckLockInfo():
-      info = self.jdm.GetLockInfo([query.LQ_PENDING])
-      self.assertEqual(sorted(map(_MakeSet, info)), sorted([
-        ("job/%s" % job_id, None, None,
-         [("job", set([job.id for job in jobs]))])
-        for job_id, jobs in waiters.items()
-        if jobs
-        ]))
-
-    _CheckLockInfo()
-
-    # Notify in random order
-    for job_id in rnd.sample(waiters, len(waiters)):
-      # Remove from pending waiter list
-      jobs = waiters.pop(job_id)
-      for job in jobs:
-        self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
-        (result, _) = self.jdm.CheckAndRegister(job, job_id,
-                                                [constants.JOB_STATUS_SUCCESS])
-        self.assertEqual(result, self.jdm.CONTINUE)
-        self.assertFalse(self._status)
-        self.assertFalse(self._queue)
-        self.assertFalse(self.jdm.JobWaiting(job))
-
-      _CheckLockInfo()
-
-    self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
-
-    assert not waiters
-
  def testSelfDependency(self):
    job = _IdOnlyFakeJob(18937)

@@ -2186,7 +2051,7 @@ class TestJobDependencyManager(unittest.TestCase):
    def _FakeStatus(_):
      raise errors.JobLost("#msg#")

-    jdm = jqueue._JobDependencyManager(_FakeStatus, None)
+    jdm = jqueue._JobDependencyManager(_FakeStatus)
    (result, _) = jdm.CheckAndRegister(job, job_id, [])
    self.assertEqual(result, self.jdm.ERROR)
    self.assertFalse(jdm.JobWaiting(job))
--
2.2.0.rc0.207.ga3a616c


LGTM

Reply via email to