As our jobs are running as processes, their job queue will always only have a single entry. So there is no need to prevent other jobs from entering when preparing a shut down. Therefore, remove this code that is not needed any more.
Signed-off-by: Klaus Aehlig <[email protected]> --- lib/jqueue/__init__.py | 55 -------------------------------------------------- 1 file changed, 55 deletions(-) diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py index 7cb39bb..89a745c 100644 --- a/lib/jqueue/__init__.py +++ b/lib/jqueue/__init__.py @@ -1634,31 +1634,6 @@ class _JobDependencyManager: self._enqueue_fn(jobs) -def _RequireNonDrainedQueue(fn): - """Decorator checking for a non-drained queue. - - To be used with functions submitting new jobs. - - """ - def wrapper(self, *args, **kwargs): - """Wrapper function. - - @raise errors.JobQueueDrainError: if the job queue is marked for draining - - """ - # Ok when sharing the big job queue lock, as the drain file is created when - # the lock is exclusive. - # Needs access to protected member, pylint: disable=W0212 - if self._drained: - raise errors.JobQueueDrainError("Job queue is drained, refusing job") - - if not self._accepting_jobs: - raise errors.JobQueueError("Job queue is shutting down, refusing job") - - return fn(self, *args, **kwargs) - return wrapper - - class JobQueue(object): """Queue used to manage the jobs. @@ -1711,7 +1686,6 @@ class JobQueue(object): self._queue_size = None self._UpdateQueueSizeUnlocked() assert ht.TInt(self._queue_size) - self._drained = jstore.CheckDrainFlag() # Job dependencies self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, @@ -1812,10 +1786,6 @@ class JobQueue(object): logging.error("Failed to upload file %s to node %s: %s", file_name, node_name, msg) - # Set queue drained flag - result = \ - self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name], - self._drained) msg = result[node_name].fail_msg if msg: logging.error("Failed to set queue drained flag on node %s: %s", @@ -2107,28 +2077,6 @@ class JobQueue(object): """ self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) - @locking.ssynchronized(_LOCK) - def SetDrainFlag(self, drain_flag): - """Sets the drain flag for the queue. - - @type drain_flag: boolean - @param drain_flag: Whether to set or unset the drain flag - - """ - # Change flag locally - jstore.SetDrainFlag(drain_flag) - - self._drained = drain_flag - - # ... and on all nodes - (names, addrs) = self._GetNodeIp() - result = \ - self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag) - self._CheckRpcResult(result, self._nodes, - "Setting queue drain flag to %s" % drain_flag) - - return True - @classmethod def SubmitJob(cls, ops): """Create and store a new job. @@ -2562,9 +2510,6 @@ class JobQueue(object): be called without interfering with any job. Queued and unfinished jobs will be resumed next time. - Once this function has been called no new job submissions will be accepted - (see L{_RequireNonDrainedQueue}). - @rtype: bool @return: Whether there are any running jobs -- 2.0.0.526.g5318336
