On Thu, Nov 7, 2013 at 1:20 PM, Klaus Aehlig <[email protected]> wrote:
> The current restart procedure for masterd includes functionality
> to pick up a job from the queue and restart it, if it hasn't been
> started before. Move this functionality into a separate function
> to be able to have the enqueuing be done by luxid.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
> lib/jqueue.py | 71
> ++++++++++++++++++++++++++++++++---------------------------
> 1 file changed, 39 insertions(+), 32 deletions(-)
>
> diff --git a/lib/jqueue.py b/lib/jqueue.py
> index 9e9d988..2c9f974 100644
> --- a/lib/jqueue.py
> +++ b/lib/jqueue.py
> @@ -1712,6 +1712,44 @@ class JobQueue(object):
> self._wpool.TerminateWorkers()
> raise
>
> + def _PickupJobUnlocked(self, job_id):
> + """Load a job from the job queue
> +
> + Pick up a job that alreday is in the job queue and start/resume it.
s/alreday/already/
> +
> + """
> + job = self._LoadJobUnlocked(job_id)
> +
> + if job is None:
> + logging.warning("Job %s could not be read", job_id)
> + return
> +
> + status = job.CalcStatus()
> +
> + if status == constants.JOB_STATUS_QUEUED:
> + self._EnqueueJobsUnlocked([job])
> + logging.info("Restarting job %s", job.id)
> +
> + elif status in (constants.JOB_STATUS_RUNNING,
> + constants.JOB_STATUS_WAITING,
> + constants.JOB_STATUS_CANCELING):
> + logging.warning("Unfinished job %s found: %s", job.id, job)
> +
> + if status == constants.JOB_STATUS_WAITING:
> + job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
> + self._EnqueueJobsUnlocked([job])
> + logging.info("Restarting job %s", job.id)
> + else:
> + job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
> + "Unclean master daemon shutdown")
> + job.Finalize()
> +
> + self.UpdateJobUnlocked(job)
> +
> + @locking.ssynchronized(_LOCK)
> + def PickupJob(self, job_id):
> + self._PickupJobUnlocked(job_id)
> +
> @locking.ssynchronized(_LOCK)
> @_RequireOpenQueue
> def _InspectQueue(self):
> @@ -1723,8 +1761,6 @@ class JobQueue(object):
> """
> logging.info("Inspecting job queue")
>
> - restartjobs = []
> -
> all_job_ids = self._GetJobIDsUnlocked()
> jobs_count = len(all_job_ids)
> lastinfo = time.time()
> @@ -1736,36 +1772,7 @@ class JobQueue(object):
> idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
> lastinfo = time.time()
>
> - job = self._LoadJobUnlocked(job_id)
> -
> - # a failure in loading the job can cause 'None' to be returned
> - if job is None:
> - continue
> -
> - status = job.CalcStatus()
> -
> - if status == constants.JOB_STATUS_QUEUED:
> - restartjobs.append(job)
> -
> - elif status in (constants.JOB_STATUS_RUNNING,
> - constants.JOB_STATUS_WAITING,
> - constants.JOB_STATUS_CANCELING):
> - logging.warning("Unfinished job %s found: %s", job.id, job)
> -
> - if status == constants.JOB_STATUS_WAITING:
> - # Restart job
> - job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
> - restartjobs.append(job)
> - else:
> - job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
> - "Unclean master daemon shutdown")
> - job.Finalize()
> -
> - self.UpdateJobUnlocked(job)
> -
> - if restartjobs:
> - logging.info("Restarting %s jobs", len(restartjobs))
> - self._EnqueueJobsUnlocked(restartjobs)
> + self._PickupJobUnlocked(job_id)
>
> logging.info("Job queue inspection finished")
>
> --
> 1.8.4.1
>
LGTM, thanks.
Michele
--
Google Germany GmbH
Dienerstr. 12
80331 München
Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores