It is no longer needed, now that the only job is handled in the main thread. The only purpose of the job queue now is to pass contextual information around.
Signed-off-by: Klaus Aehlig <[email protected]> --- lib/jqueue/__init__.py | 248 +------------------------------------- test/py/ganeti.jqueue_unittest.py | 139 +-------------------- 2 files changed, 4 insertions(+), 383 deletions(-) diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py index cc3e4f3..2fdad91 100644 --- a/lib/jqueue/__init__.py +++ b/lib/jqueue/__init__.py @@ -33,9 +33,6 @@ Locking: there's a single, large lock in the L{JobQueue} class. It's used by all other classes in this module. -@var JOBQUEUE_THREADS: the number of worker threads we start for - processing jobs - """ import logging @@ -56,7 +53,6 @@ except ImportError: from ganeti import asyncnotifier from ganeti import constants from ganeti import serializer -from ganeti import workerpool from ganeti import locking from ganeti import luxi from ganeti import opcodes @@ -77,8 +73,6 @@ from ganeti import vcluster from ganeti.cmdlib import cluster -JOBQUEUE_THREADS = 1 - # member lock names to be passed to @ssynchronized decorator _LOCK = "_lock" _QUEUE = "_queue" @@ -1147,114 +1141,6 @@ class _JobProcessor(object): queue.release() -def _EvaluateJobProcessorResult(depmgr, job, result): - """Looks at a result from L{_JobProcessor} for a job. - - To be used in a L{_JobQueueWorker}. - - """ - if result == _JobProcessor.FINISHED: - # Notify waiting jobs - depmgr.NotifyWaiters(job.id) - - elif result == _JobProcessor.DEFER: - # Schedule again - raise workerpool.DeferTask(priority=job.CalcPriority()) - - elif result == _JobProcessor.WAITDEP: - # No-op, dependency manager will re-schedule - pass - - else: - raise errors.ProgrammerError("Job processor returned unknown status %s" % - (result, )) - - -class _JobQueueWorker(workerpool.BaseWorker): - """The actual job workers. - - """ - def RunTask(self, job): # pylint: disable=W0221 - """Job executor. - - @type job: L{_QueuedJob} - @param job: the job to be processed - - """ - assert job.writable, "Expected writable job" - - # Ensure only one worker is active on a single job. If a job registers for - # a dependency job, and the other job notifies before the first worker is - # done, the job can end up in the tasklist more than once. - job.processor_lock.acquire() - try: - return self._RunTaskInner(job) - finally: - job.processor_lock.release() - - def _RunTaskInner(self, job): - """Executes a job. - - Must be called with per-job lock acquired. - - """ - queue = job.queue - assert queue == self.pool.queue - - setname_fn = lambda op: self.SetTaskName(self._GetWorkerName(job, op)) - setname_fn(None) - - proc = mcpu.Processor(queue.context, job.id) - - # Create wrapper for setting thread name - wrap_execop_fn = compat.partial(self._WrapExecOpCode, setname_fn, - proc.ExecOpCode) - - _EvaluateJobProcessorResult(queue.depmgr, job, - _JobProcessor(queue, wrap_execop_fn, job)()) - - @staticmethod - def _WrapExecOpCode(setname_fn, execop_fn, op, *args, **kwargs): - """Updates the worker thread name to include a short summary of the opcode. - - @param setname_fn: Callable setting worker thread name - @param execop_fn: Callable for executing opcode (usually - L{mcpu.Processor.ExecOpCode}) - - """ - setname_fn(op) - try: - return execop_fn(op, *args, **kwargs) - finally: - setname_fn(None) - - @staticmethod - def _GetWorkerName(job, op): - """Sets the worker thread name. - - @type job: L{_QueuedJob} - @type op: L{opcodes.OpCode} - - """ - parts = ["Job%s" % job.id] - - if op: - parts.append(op.TinySummary()) - - return "/".join(parts) - - -class _JobQueueWorkerPool(workerpool.WorkerPool): - """Simple class implementing a job-processing workerpool. - - """ - def __init__(self, queue): - super(_JobQueueWorkerPool, self).__init__("Jq", - JOBQUEUE_THREADS, - _JobQueueWorker) - self.queue = queue - - class _JobDependencyManager: """Keeps track of job dependencies. @@ -1265,12 +1151,11 @@ class _JobDependencyManager: CONTINUE, WRONGSTATUS) = range(1, 6) - def __init__(self, getstatus_fn, enqueue_fn): + def __init__(self, getstatus_fn): """Initializes this class. """ self._getstatus_fn = getstatus_fn - self._enqueue_fn = enqueue_fn self._waiters = {} self._lock = locking.SharedLock("JobDepMgr") @@ -1365,31 +1250,6 @@ class _JobDependencyManager: if not waiters]: del self._waiters[job_id] - def NotifyWaiters(self, job_id): - """Notifies all jobs waiting for a certain job ID. - - @attention: Do not call until L{CheckAndRegister} returned a status other - than C{WAITDEP} for C{job_id}, or behaviour is undefined - @type job_id: int - @param job_id: Job ID - - """ - assert ht.TJobId(job_id) - - self._lock.acquire() - try: - self._RemoveEmptyWaitersUnlocked() - - jobs = self._waiters.pop(job_id, None) - finally: - self._lock.release() - - if jobs: - # Re-add jobs to workerpool - logging.debug("Re-adding %s jobs which were waiting for job %s", - len(jobs), job_id) - self._enqueue_fn(jobs) - class JobQueue(object): """Queue used to manage the jobs. @@ -1438,59 +1298,7 @@ class JobQueue(object): assert ht.TInt(self._queue_size) # Job dependencies - self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, - self._EnqueueJobs) - - # Setup worker pool - self._wpool = _JobQueueWorkerPool(self) - - def _PickupJobUnlocked(self, job_id): - """Load a job from the job queue - - Pick up a job that already is in the job queue and start/resume it. - - """ - if self.primary_jid: - logging.warning("Job process asked to pick up %s, but already has %s", - job_id, self.primary_jid) - - self.primary_jid = int(job_id) - - job = self._LoadJobUnlocked(job_id) - - if job is None: - logging.warning("Job %s could not be read", job_id) - return - - job.AddReasons(pickup=True) - - status = job.CalcStatus() - if status == constants.JOB_STATUS_QUEUED: - job.SetPid(os.getpid()) - self._EnqueueJobsUnlocked([job]) - logging.info("Restarting job %s", job.id) - - elif status in (constants.JOB_STATUS_RUNNING, - constants.JOB_STATUS_WAITING, - constants.JOB_STATUS_CANCELING): - logging.warning("Unfinished job %s found: %s", job.id, job) - - if status == constants.JOB_STATUS_WAITING: - job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None) - job.SetPid(os.getpid()) - self._EnqueueJobsUnlocked([job]) - logging.info("Restarting job %s", job.id) - else: - to_encode = errors.OpExecError("Unclean master daemon shutdown") - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, - _EncodeOpError(to_encode)) - job.Finalize() - - self.UpdateJobUnlocked(job) - - @locking.ssynchronized(_LOCK) - def PickupJob(self, job_id): - self._PickupJobUnlocked(job_id) + self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies) def _GetRpc(self, address_list): """Gets RPC runner with context. @@ -1871,28 +1679,6 @@ class JobQueue(object): return (True, result) - @locking.ssynchronized(_LOCK) - def _EnqueueJobs(self, jobs): - """Helper function to add jobs to worker pool's queue. - - @type jobs: list - @param jobs: List of all jobs - - """ - return self._EnqueueJobsUnlocked(jobs) - - def _EnqueueJobsUnlocked(self, jobs): - """Helper function to add jobs to worker pool's queue. - - @type jobs: list - @param jobs: List of all jobs - - """ - assert self._lock.is_owned(shared=0), "Must own lock in exclusive mode" - self._wpool.AddManyTasks([(job, ) for job in jobs], - priority=[job.CalcPriority() for job in jobs], - task_id=map(_GetIdAttr, jobs)) - def _GetJobStatusForDependencies(self, job_id): """Gets the status of a job for dependencies. @@ -1993,13 +1779,6 @@ class JobQueue(object): def fn(job): (success, msg) = job.ChangePriority(priority) - - if success: - try: - self._wpool.ChangeTaskPriority(job.id, job.CalcPriority()) - except workerpool.NoSuchTask: - logging.debug("Job %s is not in workerpool at this time", job.id) - return (success, msg) return self._ModifyJobUnlocked(job_id, fn) @@ -2030,26 +1809,3 @@ class JobQueue(object): self.UpdateJobUnlocked(job) return (success, msg) - - @locking.ssynchronized(_LOCK) - def PrepareShutdown(self): - """Prepare to stop the job queue. - - Returns whether there are any jobs currently running. If the latter is the - case, the job queue is not yet ready for shutdown. Once this function - returns C{True} L{Shutdown} can be called without interfering with any job. - - @rtype: bool - @return: Whether there are any running jobs - - """ - return self._wpool.HasRunningTasks() - - @locking.ssynchronized(_LOCK) - def Shutdown(self): - """Stops the job queue. - - This shutdowns all the worker threads an closes the queue. - - """ - self._wpool.TerminateWorkers() diff --git a/test/py/ganeti.jqueue_unittest.py b/test/py/ganeti.jqueue_unittest.py index e249057..4080662 100755 --- a/test/py/ganeti.jqueue_unittest.py +++ b/test/py/ganeti.jqueue_unittest.py @@ -1531,43 +1531,6 @@ class TestJobProcessor(unittest.TestCase, _JobProcessorTestUtils): self.assertRaises(IndexError, queue.GetNextUpdate) -class TestEvaluateJobProcessorResult(unittest.TestCase): - def testFinished(self): - depmgr = _FakeDependencyManager() - job = _IdOnlyFakeJob(30953) - jqueue._EvaluateJobProcessorResult(depmgr, job, - jqueue._JobProcessor.FINISHED) - self.assertEqual(depmgr.GetNextNotification(), job.id) - self.assertRaises(IndexError, depmgr.GetNextNotification) - - def testDefer(self): - depmgr = _FakeDependencyManager() - job = _IdOnlyFakeJob(11326, priority=5463) - try: - jqueue._EvaluateJobProcessorResult(depmgr, job, - jqueue._JobProcessor.DEFER) - except workerpool.DeferTask, err: - self.assertEqual(err.priority, 5463) - else: - self.fail("Didn't raise exception") - self.assertRaises(IndexError, depmgr.GetNextNotification) - - def testWaitdep(self): - depmgr = _FakeDependencyManager() - job = _IdOnlyFakeJob(21317) - jqueue._EvaluateJobProcessorResult(depmgr, job, - jqueue._JobProcessor.WAITDEP) - self.assertRaises(IndexError, depmgr.GetNextNotification) - - def testOther(self): - depmgr = _FakeDependencyManager() - job = _IdOnlyFakeJob(5813) - self.assertRaises(errors.ProgrammerError, - jqueue._EvaluateJobProcessorResult, - depmgr, job, "Other result") - self.assertRaises(IndexError, depmgr.GetNextNotification) - - class _FakeTimeoutStrategy: def __init__(self, timeouts): self.timeouts = timeouts @@ -1882,19 +1845,13 @@ class TestJobDependencyManager(unittest.TestCase): def setUp(self): self._status = [] self._queue = [] - self.jdm = jqueue._JobDependencyManager(self._GetStatus, self._Enqueue) + self.jdm = jqueue._JobDependencyManager(self._GetStatus) def _GetStatus(self, job_id): (exp_job_id, result) = self._status.pop(0) self.assertEqual(exp_job_id, job_id) return result - def _Enqueue(self, jobs): - self.assertFalse(self.jdm._lock.is_owned(), - msg=("Must not own manager lock while re-adding jobs" - " (potential deadlock)")) - self._queue.append(jobs) - def testNotFinalizedThenCancel(self): job = _IdOnlyFakeJob(17697) job_id = str(28625) @@ -2019,26 +1976,6 @@ class TestJobDependencyManager(unittest.TestCase): self.assertFalse(self.jdm.JobWaiting(job)) self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) - def testNotify(self): - job = _IdOnlyFakeJob(8227) - job_id = str(4113) - - self._status.append((job_id, constants.JOB_STATUS_RUNNING)) - (result, _) = self.jdm.CheckAndRegister(job, job_id, []) - self.assertEqual(result, self.jdm.WAIT) - self.assertFalse(self._status) - self.assertFalse(self._queue) - self.assertTrue(self.jdm.JobWaiting(job)) - self.assertEqual(self.jdm._waiters, { - job_id: set([job]), - }) - - self.jdm.NotifyWaiters(job_id) - self.assertFalse(self._status) - self.assertFalse(self.jdm._waiters) - self.assertFalse(self.jdm.JobWaiting(job)) - self.assertEqual(self._queue, [set([job])]) - def testWrongStatus(self): job = _IdOnlyFakeJob(10102) job_id = str(1271) @@ -2100,78 +2037,6 @@ class TestJobDependencyManager(unittest.TestCase): job_id: set(), }) - # Force cleanup - self.jdm.NotifyWaiters("0") - self.assertFalse(self.jdm._waiters) - self.assertFalse(self._status) - self.assertFalse(self._queue) - - def testMultipleWaiting(self): - # Use a deterministic random generator - rnd = random.Random(21402) - - job_ids = map(str, rnd.sample(range(1, 10000), 150)) - - waiters = dict((job_ids.pop(), - set(map(_IdOnlyFakeJob, - [job_ids.pop() - for _ in range(rnd.randint(1, 20))]))) - for _ in range(10)) - - # Ensure there are no duplicate job IDs - assert not utils.FindDuplicates(waiters.keys() + - [job.id - for jobs in waiters.values() - for job in jobs]) - - # Register all jobs as waiters - for job_id, job in [(job_id, job) - for (job_id, jobs) in waiters.items() - for job in jobs]: - self._status.append((job_id, constants.JOB_STATUS_QUEUED)) - (result, _) = self.jdm.CheckAndRegister(job, job_id, - [constants.JOB_STATUS_SUCCESS]) - self.assertEqual(result, self.jdm.WAIT) - self.assertFalse(self._status) - self.assertFalse(self._queue) - self.assertTrue(self.jdm.JobWaiting(job)) - - self.assertEqual(self.jdm._waiters, waiters) - - def _MakeSet((name, mode, owner_names, pending)): - return (name, mode, owner_names, - [(pendmode, set(pend)) for (pendmode, pend) in pending]) - - def _CheckLockInfo(): - info = self.jdm.GetLockInfo([query.LQ_PENDING]) - self.assertEqual(sorted(map(_MakeSet, info)), sorted([ - ("job/%s" % job_id, None, None, - [("job", set([job.id for job in jobs]))]) - for job_id, jobs in waiters.items() - if jobs - ])) - - _CheckLockInfo() - - # Notify in random order - for job_id in rnd.sample(waiters, len(waiters)): - # Remove from pending waiter list - jobs = waiters.pop(job_id) - for job in jobs: - self._status.append((job_id, constants.JOB_STATUS_SUCCESS)) - (result, _) = self.jdm.CheckAndRegister(job, job_id, - [constants.JOB_STATUS_SUCCESS]) - self.assertEqual(result, self.jdm.CONTINUE) - self.assertFalse(self._status) - self.assertFalse(self._queue) - self.assertFalse(self.jdm.JobWaiting(job)) - - _CheckLockInfo() - - self.assertFalse(self.jdm.GetLockInfo([query.LQ_PENDING])) - - assert not waiters - def testSelfDependency(self): job = _IdOnlyFakeJob(18937) @@ -2186,7 +2051,7 @@ class TestJobDependencyManager(unittest.TestCase): def _FakeStatus(_): raise errors.JobLost("#msg#") - jdm = jqueue._JobDependencyManager(_FakeStatus, None) + jdm = jqueue._JobDependencyManager(_FakeStatus) (result, _) = jdm.CheckAndRegister(job, job_id, []) self.assertEqual(result, self.jdm.ERROR) self.assertFalse(jdm.JobWaiting(job)) -- 2.2.0.rc0.207.ga3a616c
