It is no longer needed, now that the only job is handled
in the main thread. The only purpose of the job queue now
is to pass contextual information around.
Signed-off-by: Klaus Aehlig <[email protected]>
---
lib/jqueue/__init__.py | 248 +-------------------------------------
test/py/ganeti.jqueue_unittest.py | 139 +--------------------
2 files changed, 4 insertions(+), 383 deletions(-)
diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index cc3e4f3..2fdad91 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -33,9 +33,6 @@
Locking: there's a single, large lock in the L{JobQueue} class. It's
used by all other classes in this module.
-@var JOBQUEUE_THREADS: the number of worker threads we start for
- processing jobs
-
"""
import logging
@@ -56,7 +53,6 @@ except ImportError:
from ganeti import asyncnotifier
from ganeti import constants
from ganeti import serializer
-from ganeti import workerpool
from ganeti import locking
from ganeti import luxi
from ganeti import opcodes
@@ -77,8 +73,6 @@ from ganeti import vcluster
from ganeti.cmdlib import cluster
-JOBQUEUE_THREADS = 1
-
# member lock names to be passed to @ssynchronized decorator
_LOCK = "_lock"
_QUEUE = "_queue"
@@ -1147,114 +1141,6 @@ class _JobProcessor(object):
queue.release()
-def _EvaluateJobProcessorResult(depmgr, job, result):
- """Looks at a result from L{_JobProcessor} for a job.
-
- To be used in a L{_JobQueueWorker}.
-
- """
- if result == _JobProcessor.FINISHED:
- # Notify waiting jobs
- depmgr.NotifyWaiters(job.id)
-
- elif result == _JobProcessor.DEFER:
- # Schedule again
- raise workerpool.DeferTask(priority=job.CalcPriority())
-
- elif result == _JobProcessor.WAITDEP:
- # No-op, dependency manager will re-schedule
- pass
-
- else:
- raise errors.ProgrammerError("Job processor returned unknown status %s" %
- (result, ))
-
-
-class _JobQueueWorker(workerpool.BaseWorker):
- """The actual job workers.
-
- """
- def RunTask(self, job): # pylint: disable=W0221
- """Job executor.
-
- @type job: L{_QueuedJob}
- @param job: the job to be processed
-
- """
- assert job.writable, "Expected writable job"
-
- # Ensure only one worker is active on a single job. If a job registers for
- # a dependency job, and the other job notifies before the first worker is
- # done, the job can end up in the tasklist more than once.
- job.processor_lock.acquire()
- try:
- return self._RunTaskInner(job)
- finally:
- job.processor_lock.release()
-
- def _RunTaskInner(self, job):
- """Executes a job.
-
- Must be called with per-job lock acquired.
-
- """
- queue = job.queue
- assert queue == self.pool.queue
-
- setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op))
- setname_fn(None)
-
- proc = mcpu.Processor(queue.context, job.id)
-
- # Create wrapper for setting thread name
- wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn,
- proc.ExecOpCode)
-
- _EvaluateJobProcessorResult(queue.depmgr, job,
- _JobProcessor(queue, wrap_execop_fn, job)())
-
- @staticmethod
- def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs):
- """Updates the worker thread name to include a short summary of the opcode.
-
- @param setname_fn: Callable setting worker thread name
- @param execop_fn: Callable for executing opcode (usually
- L{mcpu.Processor.ExecOpCode})
-
- """
- setname_fn(op)
- try:
- return execop_fn(op, *args, **kwargs)
- finally:
- setname_fn(None)
-
- @staticmethod
- def _GetWorkerName(job, op):
- """Sets the worker thread name.
-
- @type job: L{_QueuedJob}
- @type op: L{opcodes.OpCode}
-
- """
- parts = ["Job%s" % job.id]
-
- if op:
- parts.append(op.TinySummary())
-
- return "/".join(parts)
-
-
-class _JobQueueWorkerPool(workerpool.WorkerPool):
- """Simple class implementing a job-processing workerpool.
-
- """
- def __init__(self, queue):
- super(_JobQueueWorkerPool, self).__init__("Jq",
- JOBQUEUE_THREADS,
- _JobQueueWorker)
- self.queue = queue
-
-
class _JobDependencyManager:
"""Keeps track of job dependencies.
@@ -1265,12 +1151,11 @@ class _JobDependencyManager:
CONTINUE,
WRONGSTATUS) = range(1, 6)
- def __init__(self, getstatus_fn, enqueue_fn):
+ def __init__(self, getstatus_fn):
"""Initializes this class.
"""
self._getstatus_fn = getstatus_fn
- self._enqueue_fn = enqueue_fn
self._waiters = {}
self._lock = locking.SharedLock("JobDepMgr")
@@ -1365,31 +1250,6 @@ class _JobDependencyManager:
if not waiters]:
del self._waiters[job_id]
- def NotifyWaiters(self, job_id):
- """Notifies all jobs waiting for a certain job ID.
-
- @attention: Do not call until L{CheckAndRegister} returned a status other
- than C{WAITDEP} for C{job_id}, or behaviour is undefined
- @type job_id: int
- @param job_id: Job ID
-
- """
- assert ht.TJobId(job_id)
-
- self._lock.acquire()
- try:
- self._RemoveEmptyWaitersUnlocked()
-
- jobs = self._waiters.pop(job_id, None)
- finally:
- self._lock.release()
-
- if jobs:
- # Re-add jobs to workerpool
- logging.debug("Re-adding %s jobs which were waiting for job %s",
- len(jobs), job_id)
- self._enqueue_fn(jobs)
-
class JobQueue(object):
"""Queue used to manage the jobs.
@@ -1438,59 +1298,7 @@ class JobQueue(object):
assert ht.TInt(self._queue_size)
# Job dependencies
- self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies,
- self._EnqueueJobs)
-
- # Setup worker pool
- self._wpool = _JobQueueWorkerPool(self)
-
- def _PickupJobUnlocked(self, job_id):
- """Load a job from the job queue
-
- Pick up a job that already is in the job queue and start/resume it.
-
- """
- if self.primary_jid:
- logging.warning("Job process asked to pick up %s, but already has %s",
- job_id, self.primary_jid)
-
- self.primary_jid = int(job_id)
-
- job = self._LoadJobUnlocked(job_id)
-
- if job is None:
- logging.warning("Job %s could not be read", job_id)
- return
-
- job.AddReasons(pickup=True)
-
- status = job.CalcStatus()
- if status == constants.JOB_STATUS_QUEUED:
- job.SetPid(os.getpid())
- self._EnqueueJobsUnlocked([job])
- logging.info("Restarting job %s", job.id)
-
- elif status in (constants.JOB_STATUS_RUNNING,
- constants.JOB_STATUS_WAITING,
- constants.JOB_STATUS_CANCELING):
- logging.warning("Unfinished job %s found: %s", job.id, job)
-
- if status == constants.JOB_STATUS_WAITING:
- job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
- job.SetPid(os.getpid())
- self._EnqueueJobsUnlocked([job])
- logging.info("Restarting job %s", job.id)
- else:
- to_encode = errors.OpExecError("Unclean master daemon shutdown")
- job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
- _EncodeOpError(to_encode))
- job.Finalize()
-
- self.UpdateJobUnlocked(job)
-
- @locking.ssynchronized(_LOCK)
- def PickupJob(self, job_id):
- self._PickupJobUnlocked(job_id)
+ self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies)
def _GetRpc(self, address_list):
"""Gets RPC runner with context.
@@ -1871,28 +1679,6 @@ class JobQueue(object):
return (True, result)
- @locking.ssynchronized(_LOCK)
- def _EnqueueJobs(self, jobs):
- """Helper function to add jobs to worker pool's queue.
-
- @type jobs: list
- @param jobs: List of all jobs
-
- """
- return self._EnqueueJobsUnlocked(jobs)
-
- def _EnqueueJobsUnlocked(self, jobs):
- """Helper function to add jobs to worker pool's queue.
-
- @type jobs: list
- @param jobs: List of all jobs
-
- """
- assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode"
- self._wpool.AddManyTasks([(job, ) for job in jobs],
- priority=[job.CalcPriority() for job in jobs],
- task_id=map(_GetIdAttr, jobs))
-
def _GetJobStatusForDependencies(self, job_id):
"""Gets the status of a job for dependencies.
@@ -1993,13 +1779,6 @@ class JobQueue(object):
def fn(job):
(success, msg) = job.ChangePriority(priority)
-
- if success:
- try:
- self._wpool.ChangeTaskPriority(job.id, job.CalcPriority())
- except workerpool.NoSuchTask:
- logging.debug("Job %s is not in workerpool at this time", job.id)
-
return (success, msg)
return self._ModifyJobUnlocked(job_id, fn)
@@ -2030,26 +1809,3 @@ class JobQueue(object):
self.UpdateJobUnlocked(job)
return (success, msg)
-
- @locking.ssynchronized(_LOCK)
- def PrepareShutdown(self):
- """Prepare to stop the job queue.
-
- Returns whether there are any jobs currently running. If the latter is the
- case, the job queue is not yet ready for shutdown. Once this function
- returns C{True} L{Shutdown} can be called without interfering with any job.
-
- @rtype: bool
- @return: Whether there are any running jobs
-
- """
- return self._wpool.HasRunningTasks()
-
- @locking.ssynchronized(_LOCK)
- def Shutdown(self):
- """Stops the job queue.
-
- This shutdowns all the worker threads an closes the queue.
-
- """
- self._wpool.TerminateWorkers()
diff --git a/test/py/ganeti.jqueue_unittest.py
b/test/py/ganeti.jqueue_unittest.py
index e249057..4080662 100755
--- a/test/py/ganeti.jqueue_unittest.py
+++ b/test/py/ganeti.jqueue_unittest.py
@@ -1531,43 +1531,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
self.assertRaises(IndexError, queue.GetNextUpdate)
-class TestEvaluateJobProcessorResult(unittest.TestCase):
- def testFinished(self):
- depmgr = _FakeDependencyManager()
- job = _IdOnlyFakeJob(30953)
- jqueue._EvaluateJobProcessorResult(depmgr, job,
- jqueue._JobProcessor.FINISHED)
- self.assertEqual(depmgr.GetNextNotification(), job.id)
- self.assertRaises(IndexError, depmgr.GetNextNotification)
-
- def testDefer(self):
- depmgr = _FakeDependencyManager()
- job = _IdOnlyFakeJob(11326, priority=5463)
- try:
- jqueue._EvaluateJobProcessorResult(depmgr, job,
- jqueue._JobProcessor.DEFER)
- except workerpool.DeferTask, err:
- self.assertEqual(err.priority, 5463)
- else:
- self.fail("Didn't raise exception")
- self.assertRaises(IndexError, depmgr.GetNextNotification)
-
- def testWaitdep(self):
- depmgr = _FakeDependencyManager()
- job = _IdOnlyFakeJob(21317)
- jqueue._EvaluateJobProcessorResult(depmgr, job,
- jqueue._JobProcessor.WAITDEP)
- self.assertRaises(IndexError, depmgr.GetNextNotification)
-
- def testOther(self):
- depmgr = _FakeDependencyManager()
- job = _IdOnlyFakeJob(5813)
- self.assertRaises(errors.ProgrammerError,
- jqueue._EvaluateJobProcessorResult,
- depmgr, job, "Other result")
- self.assertRaises(IndexError, depmgr.GetNextNotification)
-
-
class _FakeTimeoutStrategy:
def __init__(self, timeouts):
self.timeouts = timeouts
@@ -1882,19 +1845,13 @@ class TestJobDependencyManager(unittest.TestCase):
def setUp(self):
self._status = []
self._queue = []
- self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue)
+ self.jdm = jqueue._JobDependencyManager(self._GetStatus)
def _GetStatus(self, job_id):
(exp_job_id, result) = self._status.pop(0)
self.assertEqual(exp_job_id, job_id)
return result
- def _Enqueue(self, jobs):
- self.assertFalse(self.jdm._lock.is_owned(),
- msg=("Must not own manager lock while re-adding jobs"
- " (potential deadlock)"))
- self._queue.append(jobs)
-
def testNotFinalizedThenCancel(self):
job = _IdOnlyFakeJob(17697)
job_id = str(28625)
@@ -2019,26 +1976,6 @@ class TestJobDependencyManager(unittest.TestCase):
self.assertFalse(self.jdm.JobWaiting(job))
self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
- def testNotify(self):
- job = _IdOnlyFakeJob(8227)
- job_id = str(4113)
-
- self._status.append((job_id, constants.JOB_STATUS_RUNNING))
- (result, _) = self.jdm.CheckAndRegister(job, job_id, [])
- self.assertEqual(result, self.jdm.WAIT)
- self.assertFalse(self._status)
- self.assertFalse(self._queue)
- self.assertTrue(self.jdm.JobWaiting(job))
- self.assertEqual(self.jdm._waiters, {
- job_id: set([job]),
- })
-
- self.jdm.NotifyWaiters(job_id)
- self.assertFalse(self._status)
- self.assertFalse(self.jdm._waiters)
- self.assertFalse(self.jdm.JobWaiting(job))
- self.assertEqual(self._queue, [set([job])])
-
def testWrongStatus(self):
job = _IdOnlyFakeJob(10102)
job_id = str(1271)
@@ -2100,78 +2037,6 @@ class TestJobDependencyManager(unittest.TestCase):
job_id: set(),
})
- # Force cleanup
- self.jdm.NotifyWaiters("0")
- self.assertFalse(self.jdm._waiters)
- self.assertFalse(self._status)
- self.assertFalse(self._queue)
-
- def testMultipleWaiting(self):
- # Use a deterministic random generator
- rnd = random.Random(21402)
-
- job_ids = map(str, rnd.sample(range(1, 10000), 150))
-
- waiters = dict((job_ids.pop(),
- set(map(_IdOnlyFakeJob,
- [job_ids.pop()
- for _ in range(rnd.randint(1, 20))])))
- for _ in range(10))
-
- # Ensure there are no duplicate job IDs
- assert not utils.FindDuplicates(waiters.keys() +
- [job.id
- for jobs in waiters.values()
- for job in jobs])
-
- # Register all jobs as waiters
- for job_id, job in [(job_id, job)
- for (job_id, jobs) in waiters.items()
- for job in jobs]:
- self._status.append((job_id, constants.JOB_STATUS_QUEUED))
- (result, _) = self.jdm.CheckAndRegister(job, job_id,
- [constants.JOB_STATUS_SUCCESS])
- self.assertEqual(result, self.jdm.WAIT)
- self.assertFalse(self._status)
- self.assertFalse(self._queue)
- self.assertTrue(self.jdm.JobWaiting(job))
-
- self.assertEqual(self.jdm._waiters, waiters)
-
- def _MakeSet((name, mode, owner_names, pending)):
- return (name, mode, owner_names,
- [(pendmode, set(pend)) for (pendmode, pend) in pending])
-
- def _CheckLockInfo():
- info = self.jdm.GetLockInfo([query.LQ_PENDING])
- self.assertEqual(sorted(map(_MakeSet, info)), sorted([
- ("job/%s" % job_id, None, None,
- [("job", set([job.id for job in jobs]))])
- for job_id, jobs in waiters.items()
- if jobs
- ]))
-
- _CheckLockInfo()
-
- # Notify in random order
- for job_id in rnd.sample(waiters, len(waiters)):
- # Remove from pending waiter list
- jobs = waiters.pop(job_id)
- for job in jobs:
- self._status.append((job_id, constants.JOB_STATUS_SUCCESS))
- (result, _) = self.jdm.CheckAndRegister(job, job_id,
- [constants.JOB_STATUS_SUCCESS])
- self.assertEqual(result, self.jdm.CONTINUE)
- self.assertFalse(self._status)
- self.assertFalse(self._queue)
- self.assertFalse(self.jdm.JobWaiting(job))
-
- _CheckLockInfo()
-
- self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING]))
-
- assert not waiters
-
def testSelfDependency(self):
job = _IdOnlyFakeJob(18937)
@@ -2186,7 +2051,7 @@ class TestJobDependencyManager(unittest.TestCase):
def _FakeStatus(_):
raise errors.JobLost("#msg#")
- jdm = jqueue._JobDependencyManager(_FakeStatus, None)
+ jdm = jqueue._JobDependencyManager(_FakeStatus)
(result, _) = jdm.CheckAndRegister(job, job_id, [])
self.assertEqual(result, self.jdm.ERROR)
self.assertFalse(jdm.JobWaiting(job))
--
2.2.0.rc0.207.ga3a616c