On Thu, Sep 23, 2010 at 12:57 PM, Michael Hanselmann <[email protected]> wrote:
> This is better to group per-opcode data.
> ---
>  lib/jqueue.py |   55 +++++++++++++++++++++++++++++++++++--------------------
>  1 files changed, 35 insertions(+), 20 deletions(-)
>
> diff --git a/lib/jqueue.py b/lib/jqueue.py
> index 4dd0b53..c7fd4c7 100644
> --- a/lib/jqueue.py
> +++ b/lib/jqueue.py
> @@ -745,6 +745,17 @@ def _EncodeOpError(err):
>   return errors.EncodeException(to_encode)
>
>
> +class _OpExecContext:
> +  def __init__(self, op, index, log_prefix):
> +    """Initializes this class.
> +
> +    """
> +    self.op = op
> +    self.index = index
> +    self.log_prefix = log_prefix
> +    self.summary = op.input.Summary()
> +
> +
>  class _JobProcessor(object):
>   def __init__(self, queue, opexec_fn, job):
>     """Initializes this class.
> @@ -780,8 +791,7 @@ class _JobProcessor(object):
>         # Found an opcode already marked as running
>         raise errors.ProgrammerError("Called for job marked as running")
>
> -      log_prefix = "Op %s/%s" % (idx + 1, len(job.ops))
> -      summary = op.input.Summary()
> +      opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)))
>
>       if op.status == constants.OP_STATUS_CANCELED:
>         # Cancelled jobs are handled by the caller
> @@ -794,10 +804,10 @@ class _JobProcessor(object):
>         # completed successfully (if any did error out, then the whole job
>         # should have been aborted and not resubmitted for processing).
>         logging.info("%s: opcode %s already processed, skipping",
> -                     log_prefix, summary)
> +                     opctx.log_prefix, opctx.summary)
>         continue
>
> -      return (idx, op, log_prefix, summary)
> +      return opctx
>
>   @staticmethod
>   def _MarkWaitlock(job, op):
> @@ -820,10 +830,12 @@ class _JobProcessor(object):
>     if job.start_timestamp is None:
>       job.start_timestamp = op.start_timestamp
>
> -  def _ExecOpCodeUnlocked(self, log_prefix, op, summary):
> +  def _ExecOpCodeUnlocked(self, opctx):
>     """Processes one opcode and returns the result.
>
>     """
> +    op = opctx.op
> +
>     assert op.status == constants.OP_STATUS_WAITLOCK
>
>     try:
> @@ -831,14 +843,16 @@ class _JobProcessor(object):
>       result = self.opexec_fn(op.input,
>                               _OpExecCallbacks(self.queue, self.job, op))
>     except CancelJob:
> -      logging.exception("%s: Canceling job", log_prefix)
> +      logging.exception("%s: Canceling job", opctx.log_prefix)
>       assert op.status == constants.OP_STATUS_CANCELING
>       return (constants.OP_STATUS_CANCELING, None)
>     except Exception, err: # pylint: disable-msg=W0703
> -      logging.exception("%s: Caught exception in %s", log_prefix, summary)
> +      logging.exception("%s: Caught exception in %s",
> +                        opctx.log_prefix, opctx.summary)
>       return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
>     else:
> -      logging.debug("%s: %s successful", log_prefix, summary)
> +      logging.debug("%s: %s successful",
> +                    opctx.log_prefix, opctx.summary)
>       return (constants.OP_STATUS_SUCCESS, result)
>
>   def __call__(self):
> @@ -858,12 +872,13 @@ class _JobProcessor(object):
>     try:
>       opcount = len(job.ops)
>
> -      (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job)
> +      opctx = self._FindNextOpcode(job)
> +      op = opctx.op
>
>       # Consistency check
>       assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
>                                      constants.OP_STATUS_CANCELED)
> -                        for i in job.ops[opidx:])
> +                        for i in job.ops[opctx.index:])
>
>       assert op.status in (constants.OP_STATUS_QUEUED,
>                            constants.OP_STATUS_WAITLOCK,
> @@ -879,12 +894,12 @@ class _JobProcessor(object):
>         # Write to disk
>         queue.UpdateJobUnlocked(job)
>
> -        logging.info("%s: opcode %s waiting for locks", log_prefix, 
> op_summary)
> +        logging.info("%s: opcode %s waiting for locks",
> +                     opctx.log_prefix, opctx.summary)
>
>         queue.release()
>         try:
> -          (op_status, op_result) = \
> -            self._ExecOpCodeUnlocked(log_prefix, op, op_summary)
> +          (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
>         finally:
>           queue.acquire(shared=1)
>
> @@ -895,21 +910,21 @@ class _JobProcessor(object):
>
>         if op.status == constants.OP_STATUS_CANCELING:
>           assert not compat.any(i.status != constants.OP_STATUS_CANCELING
> -                                for i in job.ops[opidx:])
> +                                for i in job.ops[opctx.index:])
>         else:
>           assert op.status in constants.OPS_FINALIZED
>
>       # Ensure all opcodes so far have been successful
> -      assert (opidx == 0 or
> +      assert (opctx.index == 0 or
>               compat.all(i.status == constants.OP_STATUS_SUCCESS
> -                         for i in job.ops[:opidx]))
> +                         for i in job.ops[:opctx.index]))
>
>       if op.status == constants.OP_STATUS_SUCCESS:
>         finalize = False
>
>       elif op.status == constants.OP_STATUS_ERROR:
>         # Ensure failed opcode has an exception as its result
> -        assert errors.GetEncodedError(job.ops[opidx].result)
> +        assert errors.GetEncodedError(job.ops[opctx.index].result)
>
>         to_encode = errors.OpExecError("Preceding opcode failed")
>         job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
> @@ -919,7 +934,7 @@ class _JobProcessor(object):
>         # Consistency check
>         assert compat.all(i.status == constants.OP_STATUS_ERROR and
>                           errors.GetEncodedError(i.result)
> -                          for i in job.ops[opidx:])
> +                          for i in job.ops[opctx.index:])
>
>       elif op.status == constants.OP_STATUS_CANCELING:
>         job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
> @@ -933,7 +948,7 @@ class _JobProcessor(object):
>         raise errors.ProgrammerError("Unknown status '%s'" % op.status)
>
>       # Finalizing or last opcode?
> -      if finalize or opidx == (opcount - 1):
> +      if finalize or opctx.index == (opcount - 1):
>         # All opcodes have been run, finalize job
>         job.end_timestamp = TimeStampNow()
>
> @@ -941,7 +956,7 @@ class _JobProcessor(object):
>       # allowed. Once the file has been written, it can be archived anytime.
>       queue.UpdateJobUnlocked(job)
>
> -      if finalize or opidx == (opcount - 1):
> +      if finalize or opctx.index == (opcount - 1):
>         logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
>         return True
>
> --
> 1.7.0.4

LGTM

>
>

Reply via email to