LGTM
On Wed, Jun 11, 2014 at 6:05 PM, 'Klaus Aehlig' via ganeti-devel < [email protected]> wrote: > As our jobs are running as processes, their job queue will > always only have a single entry. So there is no need to > prevent other jobs from entering when preparing a shut down. > Therefore, remove this code that is not needed any more. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > lib/jqueue/__init__.py | 55 > -------------------------------------------------- > 1 file changed, 55 deletions(-) > > diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py > index 7cb39bb..89a745c 100644 > --- a/lib/jqueue/__init__.py > +++ b/lib/jqueue/__init__.py > @@ -1634,31 +1634,6 @@ class _JobDependencyManager: > self._enqueue_fn(jobs) > > > -def _RequireNonDrainedQueue(fn): > - """Decorator checking for a non-drained queue. > - > - To be used with functions submitting new jobs. > - > - """ > - def wrapper(self, *args, **kwargs): > - """Wrapper function. > - > - @raise errors.JobQueueDrainError: if the job queue is marked for > draining > - > - """ > - # Ok when sharing the big job queue lock, as the drain file is > created when > - # the lock is exclusive. > - # Needs access to protected member, pylint: disable=W0212 > - if self._drained: > - raise errors.JobQueueDrainError("Job queue is drained, refusing > job") > - > - if not self._accepting_jobs: > - raise errors.JobQueueError("Job queue is shutting down, refusing > job") > - > - return fn(self, *args, **kwargs) > - return wrapper > - > - > class JobQueue(object): > """Queue used to manage the jobs. > > @@ -1711,7 +1686,6 @@ class JobQueue(object): > self._queue_size = None > self._UpdateQueueSizeUnlocked() > assert ht.TInt(self._queue_size) > - self._drained = jstore.CheckDrainFlag() > > # Job dependencies > self.depmgr = _JobDependencyManager(self._GetJobStatusForDependencies, > @@ -1812,10 +1786,6 @@ class JobQueue(object): > logging.error("Failed to upload file %s to node %s: %s", > file_name, node_name, msg) > > - # Set queue drained flag > - result = \ > - self._GetRpc(addrs).call_jobqueue_set_drain_flag([node_name], > - self._drained) > msg = result[node_name].fail_msg > if msg: > logging.error("Failed to set queue drained flag on node %s: %s", > @@ -2107,28 +2077,6 @@ class JobQueue(object): > """ > self._queue_size = len(self._GetJobIDsUnlocked(sort=False)) > > - @locking.ssynchronized(_LOCK) > - def SetDrainFlag(self, drain_flag): > - """Sets the drain flag for the queue. > - > - @type drain_flag: boolean > - @param drain_flag: Whether to set or unset the drain flag > - > - """ > - # Change flag locally > - jstore.SetDrainFlag(drain_flag) > - > - self._drained = drain_flag > - > - # ... and on all nodes > - (names, addrs) = self._GetNodeIp() > - result = \ > - self._GetRpc(addrs).call_jobqueue_set_drain_flag(names, drain_flag) > - self._CheckRpcResult(result, self._nodes, > - "Setting queue drain flag to %s" % drain_flag) > - > - return True > - > @classmethod > def SubmitJob(cls, ops): > """Create and store a new job. > @@ -2562,9 +2510,6 @@ class JobQueue(object): > be called without interfering with any job. Queued and unfinished > jobs will > be resumed next time. > > - Once this function has been called no new job submissions will be > accepted > - (see L{_RequireNonDrainedQueue}). > - > @rtype: bool > @return: Whether there are any running jobs > > -- > 2.0.0.526.g5318336 > >
