On Thu, Sep 23, 2010 at 12:57 PM, Michael Hanselmann <[email protected]> wrote: > This is better to group per-opcode data. > --- > lib/jqueue.py | 55 +++++++++++++++++++++++++++++++++++-------------------- > 1 files changed, 35 insertions(+), 20 deletions(-) > > diff --git a/lib/jqueue.py b/lib/jqueue.py > index 4dd0b53..c7fd4c7 100644 > --- a/lib/jqueue.py > +++ b/lib/jqueue.py > @@ -745,6 +745,17 @@ def _EncodeOpError(err): > return errors.EncodeException(to_encode) > > > +class _OpExecContext: > + def __init__(self, op, index, log_prefix): > + """Initializes this class. > + > + """ > + self.op = op > + self.index = index > + self.log_prefix = log_prefix > + self.summary = op.input.Summary() > + > + > class _JobProcessor(object): > def __init__(self, queue, opexec_fn, job): > """Initializes this class. > @@ -780,8 +791,7 @@ class _JobProcessor(object): > # Found an opcode already marked as running > raise errors.ProgrammerError("Called for job marked as running") > > - log_prefix = "Op %s/%s" % (idx + 1, len(job.ops)) > - summary = op.input.Summary() > + opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops))) > > if op.status == constants.OP_STATUS_CANCELED: > # Cancelled jobs are handled by the caller > @@ -794,10 +804,10 @@ class _JobProcessor(object): > # completed successfully (if any did error out, then the whole job > # should have been aborted and not resubmitted for processing). > logging.info("%s: opcode %s already processed, skipping", > - log_prefix, summary) > + opctx.log_prefix, opctx.summary) > continue > > - return (idx, op, log_prefix, summary) > + return opctx > > @staticmethod > def _MarkWaitlock(job, op): > @@ -820,10 +830,12 @@ class _JobProcessor(object): > if job.start_timestamp is None: > job.start_timestamp = op.start_timestamp > > - def _ExecOpCodeUnlocked(self, log_prefix, op, summary): > + def _ExecOpCodeUnlocked(self, opctx): > """Processes one opcode and returns the result. > > """ > + op = opctx.op > + > assert op.status == constants.OP_STATUS_WAITLOCK > > try: > @@ -831,14 +843,16 @@ class _JobProcessor(object): > result = self.opexec_fn(op.input, > _OpExecCallbacks(self.queue, self.job, op)) > except CancelJob: > - logging.exception("%s: Canceling job", log_prefix) > + logging.exception("%s: Canceling job", opctx.log_prefix) > assert op.status == constants.OP_STATUS_CANCELING > return (constants.OP_STATUS_CANCELING, None) > except Exception, err: # pylint: disable-msg=W0703 > - logging.exception("%s: Caught exception in %s", log_prefix, summary) > + logging.exception("%s: Caught exception in %s", > + opctx.log_prefix, opctx.summary) > return (constants.OP_STATUS_ERROR, _EncodeOpError(err)) > else: > - logging.debug("%s: %s successful", log_prefix, summary) > + logging.debug("%s: %s successful", > + opctx.log_prefix, opctx.summary) > return (constants.OP_STATUS_SUCCESS, result) > > def __call__(self): > @@ -858,12 +872,13 @@ class _JobProcessor(object): > try: > opcount = len(job.ops) > > - (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job) > + opctx = self._FindNextOpcode(job) > + op = opctx.op > > # Consistency check > assert compat.all(i.status in (constants.OP_STATUS_QUEUED, > constants.OP_STATUS_CANCELED) > - for i in job.ops[opidx:]) > + for i in job.ops[opctx.index:]) > > assert op.status in (constants.OP_STATUS_QUEUED, > constants.OP_STATUS_WAITLOCK, > @@ -879,12 +894,12 @@ class _JobProcessor(object): > # Write to disk > queue.UpdateJobUnlocked(job) > > - logging.info("%s: opcode %s waiting for locks", log_prefix, > op_summary) > + logging.info("%s: opcode %s waiting for locks", > + opctx.log_prefix, opctx.summary) > > queue.release() > try: > - (op_status, op_result) = \ > - self._ExecOpCodeUnlocked(log_prefix, op, op_summary) > + (op_status, op_result) = self._ExecOpCodeUnlocked(opctx) > finally: > queue.acquire(shared=1) > > @@ -895,21 +910,21 @@ class _JobProcessor(object): > > if op.status == constants.OP_STATUS_CANCELING: > assert not compat.any(i.status != constants.OP_STATUS_CANCELING > - for i in job.ops[opidx:]) > + for i in job.ops[opctx.index:]) > else: > assert op.status in constants.OPS_FINALIZED > > # Ensure all opcodes so far have been successful > - assert (opidx == 0 or > + assert (opctx.index == 0 or > compat.all(i.status == constants.OP_STATUS_SUCCESS > - for i in job.ops[:opidx])) > + for i in job.ops[:opctx.index])) > > if op.status == constants.OP_STATUS_SUCCESS: > finalize = False > > elif op.status == constants.OP_STATUS_ERROR: > # Ensure failed opcode has an exception as its result > - assert errors.GetEncodedError(job.ops[opidx].result) > + assert errors.GetEncodedError(job.ops[opctx.index].result) > > to_encode = errors.OpExecError("Preceding opcode failed") > job.MarkUnfinishedOps(constants.OP_STATUS_ERROR, > @@ -919,7 +934,7 @@ class _JobProcessor(object): > # Consistency check > assert compat.all(i.status == constants.OP_STATUS_ERROR and > errors.GetEncodedError(i.result) > - for i in job.ops[opidx:]) > + for i in job.ops[opctx.index:]) > > elif op.status == constants.OP_STATUS_CANCELING: > job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED, > @@ -933,7 +948,7 @@ class _JobProcessor(object): > raise errors.ProgrammerError("Unknown status '%s'" % op.status) > > # Finalizing or last opcode? > - if finalize or opidx == (opcount - 1): > + if finalize or opctx.index == (opcount - 1): > # All opcodes have been run, finalize job > job.end_timestamp = TimeStampNow() > > @@ -941,7 +956,7 @@ class _JobProcessor(object): > # allowed. Once the file has been written, it can be archived anytime. > queue.UpdateJobUnlocked(job) > > - if finalize or opidx == (opcount - 1): > + if finalize or opctx.index == (opcount - 1): > logging.info("Finished job %s, status = %s", job.id, job.CalcStatus()) > return True > > -- > 1.7.0.4
LGTM > >
