On Thu, May 21, 2015 at 04:00:03PM +0200, 'Klaus Aehlig' via ganeti-devel wrote:
As there is only one job to take care of, we don't need the queueing
functionality any more. Hence we can simply process that single job
directly in the main thread, thus simplifying the overall setup.
Signed-off-by: Klaus Aehlig <[email protected]>
---
lib/jqueue/exec.py | 34 +++++++++++++++++-----------------
1 file changed, 17 insertions(+), 17 deletions(-)
diff --git a/lib/jqueue/exec.py b/lib/jqueue/exec.py
index a69a1e9..cd597f9 100644
--- a/lib/jqueue/exec.py
+++ b/lib/jqueue/exec.py
@@ -48,6 +48,8 @@ from ganeti import utils
from ganeti import pathutils
from ganeti.utils import livelock
+from ganeti.jqueue import _JobProcessor
+
def _GetMasterInfo():
"""Retrieves the job id and lock file name from the master process
@@ -77,7 +79,6 @@ def main():
utils.SetupLogging(logname, "job-%s" % (job_id,), debug=debug)
- exit_code = 1
try:
logging.debug("Preparing the context and the configuration")
context = masterd.GanetiContext(livelock_name)
@@ -103,12 +104,19 @@ def main():
prio_change[0] = True
signal.signal(signal.SIGUSR1, _User1Handler)
- logging.debug("Picking up job %d", job_id)
- context.jobqueue.PickupJob(job_id)
-
- # waiting for the job to finish
- time.sleep(1)
- while not context.jobqueue.HasJobBeenFinalized(job_id):
+ job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
+ job.SetPid(os.getpid())
+
+ execfun = mcpu.Processor(context, job_id, job_id).ExecOpCode
+ proc = _JobProcessor(context.jobqueue, execfun, job)
+ result = _JobProcessor.DEFER
+ while result != _JobProcessor.FINISHED:
+ result = proc()
+ if result == _JobProcessor.WAITDEP and not cancel[0]:
+ # Normally, the scheduler should avoid starting a job where the
+ # dependencies are not yet finalised. So warn, but wait an continue.
+ logging.warning("Got started despite a dependency not yet finished")
+ time.sleep(5)
if cancel[0]:
logging.debug("Got cancel request, cancelling job %d", job_id)
r = context.jobqueue.CancelJob(job_id)
@@ -128,15 +136,7 @@ def main():
logging.warning("Informed of priority change, but could not"
" read new priority")
prio_change[0] = False
- time.sleep(1)
-
- # wait until the queue finishes
- logging.debug("Waiting for the queue to finish")
- while context.jobqueue.PrepareShutdown():
- time.sleep(1)
- logging.debug("Shutting the queue down")
- context.jobqueue.Shutdown()
- exit_code = 0
+
except Exception: # pylint: disable=W0703
logging.exception("Exception when trying to run job %d", job_id)
finally:
@@ -144,7 +144,7 @@ def main():
logging.debug("Removing livelock file %s", livelock_name.GetPath())
os.remove(livelock_name.GetPath())
- sys.exit(exit_code)
+ sys.exit(0)
if __name__ == '__main__':
main()
--
2.2.0.rc0.207.ga3a616c
LGTM