On Wed, Oct 02, 2013 at 02:33:05PM +0200, Klaus Aehlig wrote: > This is necessary during upgrades. There the queue need to be drained, > to avoid new jobs coming to the cluster while in the process of upgrading; > nevertheless, the upgrade process needs to carry out some maintenance, > like redistributing the new configuration.
How about: During an upgrade, the job queue needs to be drained in order to avoid new jobs coming to the cluster. Nevertheless, the upgrade process needs to carry out some maintenance, like redistributing the new configuration, therefore, this patch provides a means of submitting jobs to a drained queue. > > Of course, once the more fine-grained job queue control will be implemented, > this functionality can be removed. > > Signed-off-by: Klaus Aehlig <[email protected]> > --- > lib/cli.py | 11 +++++++++++ > lib/jqueue.py | 13 +++++++++++++ > lib/luxi.py | 6 ++++++ > lib/server/masterd.py | 8 ++++++++ > src/Ganeti/Luxi.hs | 7 +++++++ > test/hs/Test/Ganeti/Luxi.hs | 2 ++ > test/py/ganeti.rapi.testutils_unittest.py | 1 + > 7 files changed, 48 insertions(+) > > diff --git a/lib/cli.py b/lib/cli.py > index b770b1b..c9314c9 100644 > --- a/lib/cli.py > +++ b/lib/cli.py > @@ -233,6 +233,7 @@ __all__ = [ > "ParseTimespec", > "RunWhileClusterStopped", > "SubmitOpCode", > + "SubmitOpCodeToDrainedQueue", > "SubmitOrSend", > "UsesRPC", > # Formatting functions > @@ -2274,6 +2275,16 @@ def SubmitOpCode(op, cl=None, feedback_fn=None, > opts=None, reporter=None): > return op_results[0] > > > +def SubmitOpCodeToDrainedQueue(op): > + """Forcefully insert a job in the queue, even if it is drained. > + > + """ > + cl = GetClient() > + job_id = cl.SubmitJobToDrainedQueue([op]) > + op_results = PollJob(job_id, cl=cl) > + return op_results[0] > + > + > def SubmitOrSend(op, opts, cl=None, feedback_fn=None): > """Wrapper around SubmitOpCode or SendJob. > > diff --git a/lib/jqueue.py b/lib/jqueue.py > index 8e93bd9..8b2b6e3 100644 > --- a/lib/jqueue.py > +++ b/lib/jqueue.py > @@ -2229,6 +2229,19 @@ class JobQueue(object): > > @locking.ssynchronized(_LOCK) > @_RequireOpenQueue > + def SubmitJobToDrainedQueue(self, ops): > + """Forcefully Create and store a new job. s/Create/create/ > + > + Do so, even if the job queue is drained. > + @see: L{_SubmitJobUnlocked} > + > + """ > + (job_id, ) = self._NewSerialsUnlocked(1) > + self._EnqueueJobsUnlocked([self._SubmitJobUnlocked(job_id, ops)]) > + return job_id > + > + @locking.ssynchronized(_LOCK) > + @_RequireOpenQueue > @_RequireNonDrainedQueue > def SubmitManyJobs(self, jobs): > """Create and store multiple jobs. > diff --git a/lib/luxi.py b/lib/luxi.py > index 0de9185..29f3aef 100644 > --- a/lib/luxi.py > +++ b/lib/luxi.py > @@ -51,6 +51,7 @@ KEY_RESULT = "result" > KEY_VERSION = "version" > > REQ_SUBMIT_JOB = "SubmitJob" > +REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = "SubmitJobToDrainedQueue" > REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs" > REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange" > REQ_CANCEL_JOB = "CancelJob" > @@ -91,6 +92,7 @@ REQ_ALL = compat.UniqueFrozenset([ > REQ_SET_DRAIN_FLAG, > REQ_SET_WATCHER_PAUSE, > REQ_SUBMIT_JOB, > + REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, > REQ_SUBMIT_MANY_JOBS, > REQ_WAIT_FOR_JOB_CHANGE, > ]) > @@ -481,6 +483,10 @@ class Client(object): > ops_state = map(lambda op: op.__getstate__(), ops) > return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, )) > > + def SubmitJobToDrainedQueue(self, ops): > + ops_state = map(lambda op: op.__getstate__(), ops) > + return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, )) > + > def SubmitManyJobs(self, jobs): > jobs_state = [] > for ops in jobs: > diff --git a/lib/server/masterd.py b/lib/server/masterd.py > index dd1f5eb..1130e90 100644 > --- a/lib/server/masterd.py > +++ b/lib/server/masterd.py > @@ -295,6 +295,14 @@ class ClientOps: > _LogNewJob(True, job_id, ops) > return job_id > > + elif method == luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE: > + logging.info("Forcefully receiving new job") > + (job_def, ) = args > + ops = [opcodes.OpCode.LoadOpCode(state) for state in job_def] > + job_id = queue.SubmitJobToDrainedQueue(ops) > + _LogNewJob(True, job_id, ops) > + return job_id > + Perhaps avoid some code duplication. Rest LGTM. Thanks, Jose > elif method == luxi.REQ_SUBMIT_MANY_JOBS: > logging.info("Receiving multiple jobs") > (job_defs, ) = args > diff --git a/src/Ganeti/Luxi.hs b/src/Ganeti/Luxi.hs > index ca602b4..570992f 100644 > --- a/src/Ganeti/Luxi.hs > +++ b/src/Ganeti/Luxi.hs > @@ -150,6 +150,9 @@ $(genLuxiOp "LuxiOp" > , (luxiReqSubmitJob, > [ simpleField "job" [t| [MetaOpCode] |] ] > ) > + , (luxiReqSubmitJobToDrainedQueue, > + [ simpleField "job" [t| [MetaOpCode] |] ] > + ) > , (luxiReqSubmitManyJobs, > [ simpleField "ops" [t| [[MetaOpCode]] |] ] > ) > @@ -368,6 +371,10 @@ decodeCall (LuxiCall call args) = > [ops1] <- fromJVal args > ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) > ops1 > return $ SubmitJob ops2 > + ReqSubmitJobToDrainedQueue -> do > + [ops1] <- fromJVal args > + ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) > ops1 > + return $ SubmitJobToDrainedQueue ops2 > ReqSubmitManyJobs -> do > [ops1] <- fromJVal args > ops2 <- mapM (fromJResult (luxiReqToRaw call) . J.readJSON) > ops1 > diff --git a/test/hs/Test/Ganeti/Luxi.hs b/test/hs/Test/Ganeti/Luxi.hs > index f6ac5be..f687b05 100644 > --- a/test/hs/Test/Ganeti/Luxi.hs > +++ b/test/hs/Test/Ganeti/Luxi.hs > @@ -77,6 +77,8 @@ instance Arbitrary Luxi.LuxiOp where > kind <- arbitrary > Luxi.QueryTags kind <$> genLuxiTagName kind > Luxi.ReqSubmitJob -> Luxi.SubmitJob <$> resize maxOpCodes arbitrary > + Luxi.ReqSubmitJobToDrainedQueue -> Luxi.SubmitJobToDrainedQueue <$> > + resize maxOpCodes arbitrary > Luxi.ReqSubmitManyJobs -> Luxi.SubmitManyJobs <$> > resize maxOpCodes arbitrary > Luxi.ReqWaitForJobChange -> Luxi.WaitForJobChange <$> arbitrary <*> > diff --git a/test/py/ganeti.rapi.testutils_unittest.py > b/test/py/ganeti.rapi.testutils_unittest.py > index ea262fd..841d5ae 100755 > --- a/test/py/ganeti.rapi.testutils_unittest.py > +++ b/test/py/ganeti.rapi.testutils_unittest.py > @@ -39,6 +39,7 @@ import testutils > > KNOWN_UNUSED_LUXI = compat.UniqueFrozenset([ > luxi.REQ_SUBMIT_MANY_JOBS, > + luxi.REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, > luxi.REQ_ARCHIVE_JOB, > luxi.REQ_AUTO_ARCHIVE_JOBS, > luxi.REQ_CHANGE_JOB_PRIORITY, > -- > 1.8.4 > -- Jose Antonio Lopes Ganeti Engineering Google Germany GmbH Dienerstr. 12, 80331, München Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Geschäftsführer: Graham Law, Christine Elizabeth Flores Steuernummer: 48/725/00206 Umsatzsteueridentifikationsnummer: DE813741370
