As there is only one job to take care of, we don't need the queueing functionality any more. Hence we can simply process that single job directly in the main thread, thus simplifying the overall setup.
Signed-off-by: Klaus Aehlig <[email protected]> --- lib/jqueue/exec.py | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/lib/jqueue/exec.py b/lib/jqueue/exec.py index a69a1e9..cd597f9 100644 --- a/lib/jqueue/exec.py +++ b/lib/jqueue/exec.py @@ -48,6 +48,8 @@ from ganeti import utils from ganeti import pathutils from ganeti.utils import livelock +from ganeti.jqueue import _JobProcessor + def _GetMasterInfo(): """Retrieves the job id and lock file name from the master process @@ -77,7 +79,6 @@ def main(): utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug) - exit_code = 1 try: logging.debug("Preparing the context and the configuration") context = masterd.GanetiContext(livelock_name) @@ -103,12 +104,19 @@ def main(): prio_change[0] = True signal.signal(signal.SIGUSR1, _User1Handler) - logging.debug("Picking up job %d", job_id) - context.jobqueue.PickupJob(job_id) - - # waiting for the job to finish - time.sleep(1) - while not context.jobqueue.HasJobBeenFinalized(job_id): + job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) + job.SetPid(os.getpid()) + + execfun = mcpu.Processor(context, job_id, job_id).ExecOpCode + proc = _JobProcessor(context.jobqueue, execfun, job) + result = _JobProcessor.DEFER + while result != _JobProcessor.FINISHED: + result = proc() + if result == _JobProcessor.WAITDEP and not cancel[0]: + # Normally, the scheduler should avoid starting a job where the + # dependencies are not yet finalised. So warn, but wait an continue. + logging.warning("Got started despite a dependency not yet finished") + time.sleep(5) if cancel[0]: logging.debug("Got cancel request, cancelling job %d", job_id) r = context.jobqueue.CancelJob(job_id) @@ -128,15 +136,7 @@ def main(): logging.warning("Informed of priority change, but could not" " read new priority") prio_change[0] = False - time.sleep(1) - - # wait until the queue finishes - logging.debug("Waiting for the queue to finish") - while context.jobqueue.PrepareShutdown(): - time.sleep(1) - logging.debug("Shutting the queue down") - context.jobqueue.Shutdown() - exit_code = 0 + except Exception: # pylint: disable=W0703 logging.exception("Exception when trying to run job %d", job_id) finally: @@ -144,7 +144,7 @@ def main(): logging.debug("Removing livelock file %s", livelock_name.GetPath()) os.remove(livelock_name.GetPath()) - sys.exit(exit_code) + sys.exit(0) if __name__ == '__main__': main() -- 2.2.0.rc0.207.ga3a616c
