LGTM On Fri, Nov 20, 2015 at 5:11 PM, 'Oleg Ponomarev' via ganeti-devel < [email protected]> wrote:
> SafeLoadJobFromDisk will be used in further patches in order to load > job files for jobs whose processes have disappeared. The functionality > will be used to run global post hooks for such jobs. > > Signed-off-by: Oleg Ponomarev <[email protected]> > --- > lib/jqueue/__init__.py | 23 ++++++++++++++--------- > lib/jqueue/exec.py | 8 ++++---- > 2 files changed, 18 insertions(+), 13 deletions(-) > > diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py > index fcddaef..38a2ff9 100644 > --- a/lib/jqueue/__init__.py > +++ b/lib/jqueue/__init__.py > @@ -249,6 +249,8 @@ class _QueuedJob(object): > self.livelock = None > self.process_id = None > > + self.writable = None > + > self._InitInMemory(self, writable) > > assert not self.archived, "New jobs can not be marked as archived" > @@ -1428,7 +1430,7 @@ class JobQueue(object): > return job > > try: > - job = self._LoadJobFromDisk(job_id, False) > + job = JobQueue._LoadJobFromDisk(self, job_id, False) > if job is None: > return job > except errors.JobFileCorrupted: > @@ -1449,7 +1451,8 @@ class JobQueue(object): > logging.debug("Added job %s to the cache", job_id) > return job > > - def _LoadJobFromDisk(self, job_id, try_archived, writable=None): > + @staticmethod > + def _LoadJobFromDisk(queue, job_id, try_archived, writable=None): > """Load the given job file from disk. > > Given a job file, read, load and restore it in a _QueuedJob format. > @@ -1462,10 +1465,10 @@ class JobQueue(object): > @return: either None or the job object > > """ > - path_functions = [(self._GetJobPath, False)] > + path_functions = [(JobQueue._GetJobPath, False)] > > if try_archived: > - path_functions.append((self._GetArchivedJobPath, True)) > + path_functions.append((JobQueue._GetArchivedJobPath, True)) > > raw_data = None > archived = None > @@ -1490,13 +1493,14 @@ class JobQueue(object): > > try: > data = serializer.LoadJson(raw_data) > - job = _QueuedJob.Restore(self, data, writable, archived) > + job = _QueuedJob.Restore(queue, data, writable, archived) > except Exception, err: # pylint: disable=W0703 > raise errors.JobFileCorrupted(err) > > return job > > - def SafeLoadJobFromDisk(self, job_id, try_archived, writable=None): > + @staticmethod > + def SafeLoadJobFromDisk(queue, job_id, try_archived, writable=None): > """Load the given job file from disk. > > Given a job file, read, load and restore it in a _QueuedJob format. > @@ -1512,7 +1516,8 @@ class JobQueue(object): > > """ > try: > - return self._LoadJobFromDisk(job_id, try_archived, > writable=writable) > + return JobQueue._LoadJobFromDisk(queue, job_id, try_archived, > + writable=writable) > except (errors.JobFileCorrupted, EnvironmentError): > logging.exception("Can't load/parse job %s", job_id) > return None > @@ -1566,7 +1571,7 @@ class JobQueue(object): > # Not using in-memory cache as doing so would require an exclusive > lock > > # Try to load from disk > - job = self.SafeLoadJobFromDisk(job_id, True, writable=False) > + job = JobQueue.SafeLoadJobFromDisk(self, job_id, True, writable=False) > > if job: > assert not job.writable, "Got writable job" # pylint: disable=E1101 > @@ -1611,7 +1616,7 @@ class JobQueue(object): > None if the job doesn't exist > > """ > - job = self.SafeLoadJobFromDisk(job_id, True, writable=False) > + job = JobQueue.SafeLoadJobFromDisk(self, job_id, True, writable=False) > if job is not None: > return job.CalcStatus() in constants.JOBS_FINALIZED > elif cluster.LUClusterDestroy.clusterHasBeenDestroyed: > diff --git a/lib/jqueue/exec.py b/lib/jqueue/exec.py > index 8e61805..896c002 100644 > --- a/lib/jqueue/exec.py > +++ b/lib/jqueue/exec.py > @@ -49,7 +49,7 @@ from ganeti import utils > from ganeti import pathutils > from ganeti.utils import livelock > > -from ganeti.jqueue import _JobProcessor > +from ganeti.jqueue import _JobProcessor, JobQueue > > > def _GetMasterInfo(): > @@ -132,7 +132,7 @@ def main(): > prio_change[0] = True > signal.signal(signal.SIGUSR1, _User1Handler) > > - job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) > + job = JobQueue.SafeLoadJobFromDisk(context.jobqueue, job_id, False) > > job.SetPid(os.getpid()) > > @@ -154,7 +154,7 @@ def main(): > if cancel[0]: > logging.debug("Got cancel request, cancelling job %d", job_id) > r = context.jobqueue.CancelJob(job_id) > - job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) > + job = JobQueue.SafeLoadJobFromDisk(context.jobqueue, job_id, > False) > proc = _JobProcessor(context.jobqueue, execfun, job) > logging.debug("CancelJob result for job %d: %s", job_id, r) > cancel[0] = False > @@ -166,7 +166,7 @@ def main(): > utils.RemoveFile(fname) > logging.debug("Changing priority of job %d to %d", job_id, > new_prio) > r = context.jobqueue.ChangeJobPriority(job_id, new_prio) > - job = context.jobqueue.SafeLoadJobFromDisk(job_id, False) > + job = JobQueue.SafeLoadJobFromDisk(context.jobqueue, job_id, > False) > proc = _JobProcessor(context.jobqueue, execfun, job) > logging.debug("Result of changing priority of %d to %d: %s", > job_id, > new_prio, r) > -- > 2.6.0.rc2.230.g3dd15c0 > > Hrvoje Ribicic Ganeti Engineering Google Germany GmbH Dienerstr. 12, 80331, München Geschäftsführer: Matthew Scott Sucherman, Paul Terence Manicle Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Diese E-Mail ist vertraulich. Wenn Sie nicht der richtige Adressat sind, leiten Sie diese bitte nicht weiter, informieren Sie den Absender und löschen Sie die E-Mail und alle Anhänge. Vielen Dank. This e-mail is confidential. If you are not the right addressee please do not forward it, please inform the sender, and please erase this e-mail including any attachments. Thanks.
