This is better to group per-opcode data.
---
lib/jqueue.py | 55 +++++++++++++++++++++++++++++++++++--------------------
1 files changed, 35 insertions(+), 20 deletions(-)
diff --git a/lib/jqueue.py b/lib/jqueue.py
index 4dd0b53..c7fd4c7 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -745,6 +745,17 @@ def _EncodeOpError(err):
return errors.EncodeException(to_encode)
+class _OpExecContext:
+ def __init__(self, op, index, log_prefix):
+ """Initializes this class.
+
+ """
+ self.op = op
+ self.index = index
+ self.log_prefix = log_prefix
+ self.summary = op.input.Summary()
+
+
class _JobProcessor(object):
def __init__(self, queue, opexec_fn, job):
"""Initializes this class.
@@ -780,8 +791,7 @@ class _JobProcessor(object):
# Found an opcode already marked as running
raise errors.ProgrammerError("Called for job marked as running")
- log_prefix = "Op %s/%s" % (idx + 1, len(job.ops))
- summary = op.input.Summary()
+ opctx = _OpExecContext(op, idx, "Op %s/%s" % (idx + 1, len(job.ops)))
if op.status == constants.OP_STATUS_CANCELED:
# Cancelled jobs are handled by the caller
@@ -794,10 +804,10 @@ class _JobProcessor(object):
# completed successfully (if any did error out, then the whole job
# should have been aborted and not resubmitted for processing).
logging.info("%s: opcode %s already processed, skipping",
- log_prefix, summary)
+ opctx.log_prefix, opctx.summary)
continue
- return (idx, op, log_prefix, summary)
+ return opctx
@staticmethod
def _MarkWaitlock(job, op):
@@ -820,10 +830,12 @@ class _JobProcessor(object):
if job.start_timestamp is None:
job.start_timestamp = op.start_timestamp
- def _ExecOpCodeUnlocked(self, log_prefix, op, summary):
+ def _ExecOpCodeUnlocked(self, opctx):
"""Processes one opcode and returns the result.
"""
+ op = opctx.op
+
assert op.status == constants.OP_STATUS_WAITLOCK
try:
@@ -831,14 +843,16 @@ class _JobProcessor(object):
result = self.opexec_fn(op.input,
_OpExecCallbacks(self.queue, self.job, op))
except CancelJob:
- logging.exception("%s: Canceling job", log_prefix)
+ logging.exception("%s: Canceling job", opctx.log_prefix)
assert op.status == constants.OP_STATUS_CANCELING
return (constants.OP_STATUS_CANCELING, None)
except Exception, err: # pylint: disable-msg=W0703
- logging.exception("%s: Caught exception in %s", log_prefix, summary)
+ logging.exception("%s: Caught exception in %s",
+ opctx.log_prefix, opctx.summary)
return (constants.OP_STATUS_ERROR, _EncodeOpError(err))
else:
- logging.debug("%s: %s successful", log_prefix, summary)
+ logging.debug("%s: %s successful",
+ opctx.log_prefix, opctx.summary)
return (constants.OP_STATUS_SUCCESS, result)
def __call__(self):
@@ -858,12 +872,13 @@ class _JobProcessor(object):
try:
opcount = len(job.ops)
- (opidx, op, log_prefix, op_summary) = self._FindNextOpcode(job)
+ opctx = self._FindNextOpcode(job)
+ op = opctx.op
# Consistency check
assert compat.all(i.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_CANCELED)
- for i in job.ops[opidx:])
+ for i in job.ops[opctx.index:])
assert op.status in (constants.OP_STATUS_QUEUED,
constants.OP_STATUS_WAITLOCK,
@@ -879,12 +894,12 @@ class _JobProcessor(object):
# Write to disk
queue.UpdateJobUnlocked(job)
- logging.info("%s: opcode %s waiting for locks", log_prefix, op_summary)
+ logging.info("%s: opcode %s waiting for locks",
+ opctx.log_prefix, opctx.summary)
queue.release()
try:
- (op_status, op_result) = \
- self._ExecOpCodeUnlocked(log_prefix, op, op_summary)
+ (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
finally:
queue.acquire(shared=1)
@@ -895,21 +910,21 @@ class _JobProcessor(object):
if op.status == constants.OP_STATUS_CANCELING:
assert not compat.any(i.status != constants.OP_STATUS_CANCELING
- for i in job.ops[opidx:])
+ for i in job.ops[opctx.index:])
else:
assert op.status in constants.OPS_FINALIZED
# Ensure all opcodes so far have been successful
- assert (opidx == 0 or
+ assert (opctx.index == 0 or
compat.all(i.status == constants.OP_STATUS_SUCCESS
- for i in job.ops[:opidx]))
+ for i in job.ops[:opctx.index]))
if op.status == constants.OP_STATUS_SUCCESS:
finalize = False
elif op.status == constants.OP_STATUS_ERROR:
# Ensure failed opcode has an exception as its result
- assert errors.GetEncodedError(job.ops[opidx].result)
+ assert errors.GetEncodedError(job.ops[opctx.index].result)
to_encode = errors.OpExecError("Preceding opcode failed")
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
@@ -919,7 +934,7 @@ class _JobProcessor(object):
# Consistency check
assert compat.all(i.status == constants.OP_STATUS_ERROR and
errors.GetEncodedError(i.result)
- for i in job.ops[opidx:])
+ for i in job.ops[opctx.index:])
elif op.status == constants.OP_STATUS_CANCELING:
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
@@ -933,7 +948,7 @@ class _JobProcessor(object):
raise errors.ProgrammerError("Unknown status '%s'" % op.status)
# Finalizing or last opcode?
- if finalize or opidx == (opcount - 1):
+ if finalize or opctx.index == (opcount - 1):
# All opcodes have been run, finalize job
job.end_timestamp = TimeStampNow()
@@ -941,7 +956,7 @@ class _JobProcessor(object):
# allowed. Once the file has been written, it can be archived anytime.
queue.UpdateJobUnlocked(job)
- if finalize or opidx == (opcount - 1):
+ if finalize or opctx.index == (opcount - 1):
logging.info("Finished job %s, status = %s", job.id, job.CalcStatus())
return True
--
1.7.0.4