SafeLoadJobFromDisk will be used in further patches in order to load job files for jobs whose processes have disappeared. The functionality will be used to run global post hooks for such jobs.
Signed-off-by: Oleg Ponomarev <[email protected]> --- lib/jqueue/__init__.py | 23 ++++++++++++++--------- lib/jqueue/exec.py | 8 ++++---- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py index fcddaef..38a2ff9 100644 --- a/lib/jqueue/__init__.py +++ b/lib/jqueue/__init__.py @@ -249,6 +249,8 @@ class _QueuedJob(object): self.livelock = None self.process_id = None + self.writable = None + self._InitInMemory(self, writable) assert not self.archived, "New jobs can not be marked as archived" @@ -1428,7 +1430,7 @@ class JobQueue(object): return job try: - job = self._LoadJobFromDisk(job_id, False) + job = JobQueue._LoadJobFromDisk(self, job_id, False) if job is None: return job except errors.JobFileCorrupted: @@ -1449,7 +1451,8 @@ class JobQueue(object): logging.debug("Added job %s to the cache", job_id) return job - def _LoadJobFromDisk(self, job_id, try_archived, writable=None): + @staticmethod + def _LoadJobFromDisk(queue, job_id, try_archived, writable=None): """Load the given job file from disk. Given a job file, read, load and restore it in a _QueuedJob format. @@ -1462,10 +1465,10 @@ class JobQueue(object): @return: either None or the job object """ - path_functions = [(self._GetJobPath, False)] + path_functions = [(JobQueue._GetJobPath, False)] if try_archived: - path_functions.append((self._GetArchivedJobPath, True)) + path_functions.append((JobQueue._GetArchivedJobPath, True)) raw_data = None archived = None @@ -1490,13 +1493,14 @@ class JobQueue(object): try: data = serializer.LoadJson(raw_data) - job = _QueuedJob.Restore(self, data, writable, archived) + job = _QueuedJob.Restore(queue, data, writable, archived) except Exception, err: # pylint: disable=W0703 raise errors.JobFileCorrupted(err) return job - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None): + @staticmethod + def SafeLoadJobFromDisk(queue, job_id, try_archived, writable=None): """Load the given job file from disk. Given a job file, read, load and restore it in a _QueuedJob format. @@ -1512,7 +1516,8 @@ class JobQueue(object): """ try: - return self._LoadJobFromDisk(job_id, try_archived, writable=writable) + return JobQueue._LoadJobFromDisk(queue, job_id, try_archived, + writable=writable) except (errors.JobFileCorrupted, EnvironmentError): logging.exception("Can't load/parse job %s", job_id) return None @@ -1566,7 +1571,7 @@ class JobQueue(object): # Not using in-memory cache as doing so would require an exclusive lock # Try to load from disk - job = self.SafeLoadJobFromDisk(job_id, True, writable=False) + job = JobQueue.SafeLoadJobFromDisk(self, job_id, True, writable=False) if job: assert not job.writable, "Got writable job" # pylint: disable=E1101 @@ -1611,7 +1616,7 @@ class JobQueue(object): None if the job doesn't exist """ - job = self.SafeLoadJobFromDisk(job_id, True, writable=False) + job = JobQueue.SafeLoadJobFromDisk(self, job_id, True, writable=False) if job is not None: return job.CalcStatus() in constants.JOBS_FINALIZED elif cluster.LUClusterDestroy.clusterHasBeenDestroyed: diff --git a/lib/jqueue/exec.py b/lib/jqueue/exec.py index 8e61805..896c002 100644 --- a/lib/jqueue/exec.py +++ b/lib/jqueue/exec.py @@ -49,7 +49,7 @@ from ganeti import utils from ganeti import pathutils from ganeti.utils import livelock -from ganeti.jqueue import _JobProcessor +from ganeti.jqueue import _JobProcessor, JobQueue def _GetMasterInfo(): @@ -132,7 +132,7 @@ def main(): prio_change[0] = True signal.signal(signal.SIGUSR1, _User1Handler) - job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) + job = JobQueue.SafeLoadJobFromDisk(context.jobqueue, job_id, False) job.SetPid(os.getpid()) @@ -154,7 +154,7 @@ def main(): if cancel[0]: logging.debug("Got cancel request, cancelling job %d", job_id) r = context.jobqueue.CancelJob(job_id) - job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) + job = JobQueue.SafeLoadJobFromDisk(context.jobqueue, job_id, False) proc = _JobProcessor(context.jobqueue, execfun, job) logging.debug("CancelJob result for job %d: %s", job_id, r) cancel[0] = False @@ -166,7 +166,7 @@ def main(): utils.RemoveFile(fname) logging.debug("Changing priority of job %d to %d", job_id, new_prio) r = context.jobqueue.ChangeJobPriority(job_id, new_prio) - job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) + job = JobQueue.SafeLoadJobFromDisk(context.jobqueue, job_id, False) proc = _JobProcessor(context.jobqueue, execfun, job) logging.debug("Result of changing priority of %d to %d: %s", job_id, new_prio, r) -- 2.6.0.rc2.230.g3dd15c0
