On Thu, Nov 7, 2013 at 1:20 PM, Klaus Aehlig <[email protected]> wrote:
> The current restart procedure for masterd includes functionality
> to pick up a job from the queue and restart it, if it hasn't been
> started before. Move this functionality into a separate function
> to be able to have the enqueuing be done by luxid.
>
> Signed-off-by: Klaus Aehlig <[email protected]>
> ---
>  lib/jqueue.py | 71 
> ++++++++++++++++++++++++++++++++---------------------------
>  1 file changed, 39 insertions(+), 32 deletions(-)
>
> diff --git a/lib/jqueue.py b/lib/jqueue.py
> index 9e9d988..2c9f974 100644
> --- a/lib/jqueue.py
> +++ b/lib/jqueue.py
> @@ -1712,6 +1712,44 @@ class JobQueue(object):
>        self._wpool.TerminateWorkers()
>        raise
>
> +  def _PickupJobUnlocked(self, job_id):
> +    """Load a job from the job queue
> +
> +    Pick up a job that alreday is in the job queue and start/resume it.

s/alreday/already/

> +
> +    """
> +    job = self._LoadJobUnlocked(job_id)
> +
> +    if job is None:
> +      logging.warning("Job %s could not be read", job_id)
> +      return
> +
> +    status = job.CalcStatus()
> +
> +    if status == constants.JOB_STATUS_QUEUED:
> +      self._EnqueueJobsUnlocked([job])
> +      logging.info("Restarting job %s", job.id)
> +
> +    elif status in (constants.JOB_STATUS_RUNNING,
> +                    constants.JOB_STATUS_WAITING,
> +                    constants.JOB_STATUS_CANCELING):
> +      logging.warning("Unfinished job %s found: %s", job.id, job)
> +
> +      if status == constants.JOB_STATUS_WAITING:
> +        job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
> +        self._EnqueueJobsUnlocked([job])
> +        logging.info("Restarting job %s", job.id)
> +      else:
> +        job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
> +                              "Unclean master daemon shutdown")
> +        job.Finalize()
> +
> +    self.UpdateJobUnlocked(job)
> +
> +  @locking.ssynchronized(_LOCK)
> +  def PickupJob(self, job_id):
> +    self._PickupJobUnlocked(job_id)
> +
>    @locking.ssynchronized(_LOCK)
>    @_RequireOpenQueue
>    def _InspectQueue(self):
> @@ -1723,8 +1761,6 @@ class JobQueue(object):
>      """
>      logging.info("Inspecting job queue")
>
> -    restartjobs = []
> -
>      all_job_ids = self._GetJobIDsUnlocked()
>      jobs_count = len(all_job_ids)
>      lastinfo = time.time()
> @@ -1736,36 +1772,7 @@ class JobQueue(object):
>                       idx, jobs_count - 1, 100.0 * (idx + 1) / jobs_count)
>          lastinfo = time.time()
>
> -      job = self._LoadJobUnlocked(job_id)
> -
> -      # a failure in loading the job can cause 'None' to be returned
> -      if job is None:
> -        continue
> -
> -      status = job.CalcStatus()
> -
> -      if status == constants.JOB_STATUS_QUEUED:
> -        restartjobs.append(job)
> -
> -      elif status in (constants.JOB_STATUS_RUNNING,
> -                      constants.JOB_STATUS_WAITING,
> -                      constants.JOB_STATUS_CANCELING):
> -        logging.warning("Unfinished job %s found: %s", job.id, job)
> -
> -        if status == constants.JOB_STATUS_WAITING:
> -          # Restart job
> -          job.MarkUnfinishedOps(constants.OP_STATUS_QUEUED, None)
> -          restartjobs.append(job)
> -        else:
> -          job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
> -                                "Unclean master daemon shutdown")
> -          job.Finalize()
> -
> -        self.UpdateJobUnlocked(job)
> -
> -    if restartjobs:
> -      logging.info("Restarting %s jobs", len(restartjobs))
> -      self._EnqueueJobsUnlocked(restartjobs)
> +      self._PickupJobUnlocked(job_id)
>
>      logging.info("Job queue inspection finished")
>
> --
> 1.8.4.1
>

LGTM, thanks.
Michele

-- 
Google Germany GmbH
Dienerstr. 12
80331 München

Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores

Reply via email to