SafeLoadJobFromDisk will be used in further patches in order to load
job files for jobs whose processes have disappeared. The functionality
will be used to run global post hooks for such jobs.

Signed-off-by: Oleg Ponomarev <[email protected]>
---
 lib/jqueue/__init__.py | 23 ++++++++++++++---------
 lib/jqueue/exec.py     |  8 ++++----
 2 files changed, 18 insertions(+), 13 deletions(-)

diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index fcddaef..38a2ff9 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -249,6 +249,8 @@ class _QueuedJob(object):
     self.livelock = None
     self.process_id = None
 
+    self.writable = None
+
     self._InitInMemory(self, writable)
 
     assert not self.archived, "New jobs can not be marked as archived"
@@ -1428,7 +1430,7 @@ class JobQueue(object):
       return job
 
     try:
-      job = self._LoadJobFromDisk(job_id, False)
+      job = JobQueue._LoadJobFromDisk(self, job_id, False)
       if job is None:
         return job
     except errors.JobFileCorrupted:
@@ -1449,7 +1451,8 @@ class JobQueue(object):
     logging.debug("Added job %s to the cache", job_id)
     return job
 
-  def _LoadJobFromDisk(self, job_id, try_archived, writable=None):
+  @staticmethod
+  def _LoadJobFromDisk(queue, job_id, try_archived, writable=None):
     """Load the given job file from disk.
 
     Given a job file, read, load and restore it in a _QueuedJob format.
@@ -1462,10 +1465,10 @@ class JobQueue(object):
     @return: either None or the job object
 
     """
-    path_functions = [(self._GetJobPath, False)]
+    path_functions = [(JobQueue._GetJobPath, False)]
 
     if try_archived:
-      path_functions.append((self._GetArchivedJobPath, True))
+      path_functions.append((JobQueue._GetArchivedJobPath, True))
 
     raw_data = None
     archived = None
@@ -1490,13 +1493,14 @@ class JobQueue(object):
 
     try:
       data = serializer.LoadJson(raw_data)
-      job = _QueuedJob.Restore(self, data, writable, archived)
+      job = _QueuedJob.Restore(queue, data, writable, archived)
     except Exception, err: # pylint: disable=W0703
       raise errors.JobFileCorrupted(err)
 
     return job
 
-  def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None):
+  @staticmethod
+  def SafeLoadJobFromDisk(queue, job_id, try_archived, writable=None):
     """Load the given job file from disk.
 
     Given a job file, read, load and restore it in a _QueuedJob format.
@@ -1512,7 +1516,8 @@ class JobQueue(object):
 
     """
     try:
-      return self._LoadJobFromDisk(job_id, try_archived, writable=writable)
+      return JobQueue._LoadJobFromDisk(queue, job_id, try_archived,
+                                       writable=writable)
     except (errors.JobFileCorrupted, EnvironmentError):
       logging.exception("Can't load/parse job %s", job_id)
       return None
@@ -1566,7 +1571,7 @@ class JobQueue(object):
     # Not using in-memory cache as doing so would require an exclusive lock
 
     # Try to load from disk
-    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
+    job = JobQueue.SafeLoadJobFromDisk(self, job_id, True, writable=False)
 
     if job:
       assert not job.writable, "Got writable job" # pylint: disable=E1101
@@ -1611,7 +1616,7 @@ class JobQueue(object):
         None if the job doesn't exist
 
     """
-    job = self.SafeLoadJobFromDisk(job_id, True, writable=False)
+    job = JobQueue.SafeLoadJobFromDisk(self, job_id, True, writable=False)
     if job is not None:
       return job.CalcStatus() in constants.JOBS_FINALIZED
     elif cluster.LUClusterDestroy.clusterHasBeenDestroyed:
diff --git a/lib/jqueue/exec.py b/lib/jqueue/exec.py
index 8e61805..896c002 100644
--- a/lib/jqueue/exec.py
+++ b/lib/jqueue/exec.py
@@ -49,7 +49,7 @@ from ganeti import utils
 from ganeti import pathutils
 from ganeti.utils import livelock
 
-from ganeti.jqueue import _JobProcessor
+from ganeti.jqueue import _JobProcessor, JobQueue
 
 
 def _GetMasterInfo():
@@ -132,7 +132,7 @@ def main():
       prio_change[0] = True
     signal.signal(signal.SIGUSR1, _User1Handler)
 
-    job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
+    job = JobQueue.SafeLoadJobFromDisk(context.jobqueue, job_id, False)
 
     job.SetPid(os.getpid())
 
@@ -154,7 +154,7 @@ def main():
       if cancel[0]:
         logging.debug("Got cancel request, cancelling job %d", job_id)
         r = context.jobqueue.CancelJob(job_id)
-        job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
+        job = JobQueue.SafeLoadJobFromDisk(context.jobqueue, job_id, False)
         proc = _JobProcessor(context.jobqueue, execfun, job)
         logging.debug("CancelJob result for job %d: %s", job_id, r)
         cancel[0] = False
@@ -166,7 +166,7 @@ def main():
           utils.RemoveFile(fname)
           logging.debug("Changing priority of job %d to %d", job_id, new_prio)
           r = context.jobqueue.ChangeJobPriority(job_id, new_prio)
-          job = context.jobqueue.SafeLoadJobFromDisk(job_id, False)
+          job = JobQueue.SafeLoadJobFromDisk(context.jobqueue, job_id, False)
           proc = _JobProcessor(context.jobqueue, execfun, job)
           logging.debug("Result of changing priority of %d to %d: %s", job_id,
                         new_prio, r)
-- 
2.6.0.rc2.230.g3dd15c0

Reply via email to