Each job has now its own separate lock, rather than being protected by
the job queue one. Interesting changes are:
- WaitForJobChanges now has to release both the shared jqueue lock and
the job lock (which is done automatically by the wait call) and then
reacquire them in the correct order.
- Callers of UpdateJobUnlocked must hold the job lock or the
notification will fail.
- Some acquire of the bjl can now be shared.
Signed-off-by: Guido Trotter <[email protected]>
---
lib/jqueue.py | 222 ++++++++++++++++++++++++++++++++++++--------------------
1 files changed, 143 insertions(+), 79 deletions(-)
diff --git a/lib/jqueue.py b/lib/jqueue.py
index 81e2e40..ed9cabe 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -168,7 +168,7 @@ class _QueuedJob(object):
# pylint: disable-msg=W0212
__slots__ = ["queue", "id", "ops", "log_serial",
"received_timestamp", "start_timestamp", "end_timestamp",
- "lock_status", "change",
+ "lock_status", "change", "_lock", "acquire", "release",
"__weakref__"]
def __init__(self, queue, job_id, ops):
@@ -197,8 +197,12 @@ class _QueuedJob(object):
# In-memory attributes
self.lock_status = None
+ self._lock = threading.Lock()
+ self.acquire = self._lock.acquire
+ self.release = self._lock.release
+
# Condition to wait for changes
- self.change = locking.PipeCondition(_big_jqueue_lock)
+ self.change = locking.PipeCondition(self._lock)
def __repr__(self):
status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
@@ -237,8 +241,12 @@ class _QueuedJob(object):
obj.log_serial = max(obj.log_serial, log_entry[0])
obj.ops.append(op)
+ obj._lock = threading.Lock()
+ obj.acquire = obj._lock.acquire
+ obj.release = obj._lock.release
+
# Condition to wait for changes
- obj.change = locking.PipeCondition(_big_jqueue_lock)
+ obj.change = locking.PipeCondition(obj._lock)
return obj
@@ -383,27 +391,35 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
"""
- assert self._op.status in (constants.OP_STATUS_WAITLOCK,
- constants.OP_STATUS_CANCELING)
+ self._job.acquire()
+ try:
+ assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+ constants.OP_STATUS_CANCELING)
- # All locks are acquired by now
- self._job.lock_status = None
+ # All locks are acquired by now
+ self._job.lock_status = None
- # Cancel here if we were asked to
- if self._op.status == constants.OP_STATUS_CANCELING:
- raise CancelJob()
+ # Cancel here if we were asked to
+ if self._op.status == constants.OP_STATUS_CANCELING:
+ raise CancelJob()
- self._op.status = constants.OP_STATUS_RUNNING
- self._op.exec_timestamp = TimeStampNow()
+ self._op.status = constants.OP_STATUS_RUNNING
+ self._op.exec_timestamp = TimeStampNow()
+ finally:
+ self._job.release()
- @locking.ssynchronized(_big_jqueue_lock)
+ @locking.ssynchronized(_big_jqueue_lock, shared=1)
def _AppendFeedback(self, timestamp, log_type, log_msg):
"""Internal feedback append function, with locks
"""
- self._job.log_serial += 1
- self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
- self._job.change.notifyAll()
+ self._job.acquire()
+ try:
+ self._job.log_serial += 1
+ self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+ self._job.change.notifyAll()
+ finally:
+ self._job.release()
def Feedback(self, *args):
"""Append a log entry.
@@ -467,19 +483,23 @@ class _JobQueueWorker(workerpool.BaseWorker):
logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
op_summary)
- queue.acquire()
+ queue.acquire(shared=1)
try:
- if op.status == constants.OP_STATUS_CANCELED:
- raise CancelJob()
- assert op.status == constants.OP_STATUS_QUEUED
- op.status = constants.OP_STATUS_WAITLOCK
- op.result = None
- op.start_timestamp = TimeStampNow()
- if idx == 0: # first opcode
- job.start_timestamp = op.start_timestamp
- queue.UpdateJobUnlocked(job)
-
- input_opcode = op.input
+ job.acquire()
+ try:
+ if op.status == constants.OP_STATUS_CANCELED:
+ raise CancelJob()
+ assert op.status == constants.OP_STATUS_QUEUED
+ op.status = constants.OP_STATUS_WAITLOCK
+ op.result = None
+ op.start_timestamp = TimeStampNow()
+ if idx == 0: # first opcode
+ job.start_timestamp = op.start_timestamp
+ queue.UpdateJobUnlocked(job)
+
+ input_opcode = op.input
+ finally:
+ job.release()
finally:
queue.release()
@@ -487,12 +507,16 @@ class _JobQueueWorker(workerpool.BaseWorker):
result = proc.ExecOpCode(input_opcode,
_OpExecCallbacks(queue, job, op))
- queue.acquire()
+ queue.acquire(shared=1)
try:
- op.status = constants.OP_STATUS_SUCCESS
- op.result = result
- op.end_timestamp = TimeStampNow()
- queue.UpdateJobUnlocked(job)
+ job.acquire()
+ try:
+ op.status = constants.OP_STATUS_SUCCESS
+ op.result = result
+ op.end_timestamp = TimeStampNow()
+ queue.UpdateJobUnlocked(job)
+ finally:
+ job.release()
finally:
queue.release()
@@ -502,27 +526,35 @@ class _JobQueueWorker(workerpool.BaseWorker):
# Will be handled further up
raise
except Exception, err:
- queue.acquire()
+ queue.acquire(shared=1)
try:
+ job.acquire()
try:
- op.status = constants.OP_STATUS_ERROR
- if isinstance(err, errors.GenericError):
- op.result = errors.EncodeException(err)
- else:
- op.result = str(err)
- op.end_timestamp = TimeStampNow()
- logging.info("Op %s/%s: Error in opcode %s: %s",
- idx + 1, count, op_summary, err)
+ try:
+ op.status = constants.OP_STATUS_ERROR
+ if isinstance(err, errors.GenericError):
+ op.result = errors.EncodeException(err)
+ else:
+ op.result = str(err)
+ op.end_timestamp = TimeStampNow()
+ logging.info("Op %s/%s: Error in opcode %s: %s",
+ idx + 1, count, op_summary, err)
+ finally:
+ queue.UpdateJobUnlocked(job)
finally:
- queue.UpdateJobUnlocked(job)
+ job.release()
finally:
queue.release()
raise
except CancelJob:
- queue.acquire()
+ queue.acquire(shared=1)
try:
- queue.CancelJobUnlocked(job)
+ job.acquire()
+ try:
+ queue.CancelJobUnlocked(job)
+ finally:
+ job.release()
finally:
queue.release()
except errors.GenericError, err:
@@ -530,15 +562,19 @@ class _JobQueueWorker(workerpool.BaseWorker):
except:
logging.exception("Unhandled exception")
finally:
- queue.acquire()
+ queue.acquire(shared=1)
try:
+ job.acquire()
try:
- job.lock_status = None
- job.end_timestamp = TimeStampNow()
- queue.UpdateJobUnlocked(job)
+ try:
+ job.lock_status = None
+ job.end_timestamp = TimeStampNow()
+ queue.UpdateJobUnlocked(job)
+ finally:
+ job_id = job.id
+ status = job.CalcStatus()
finally:
- job_id = job.id
- status = job.CalcStatus()
+ job.release()
finally:
queue.release()
@@ -675,7 +711,11 @@ class JobQueue(object):
job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
"Unclean master daemon shutdown")
finally:
- self.UpdateJobUnlocked(job)
+ job.acquire()
+ try:
+ self.UpdateJobUnlocked(job)
+ finally:
+ job.release()
logging.info("Job queue inspection finished")
finally:
@@ -1070,9 +1110,12 @@ class JobQueue(object):
raise errors.JobQueueFull()
job = _QueuedJob(self, job_id, ops)
-
- # Write to disk
- self.UpdateJobUnlocked(job)
+ job.acquire()
+ try:
+ # Write to disk
+ self.UpdateJobUnlocked(job)
+ finally:
+ job.release()
self._size_lock.acquire()
try:
@@ -1131,6 +1174,8 @@ class JobQueue(object):
@type job: L{_QueuedJob}
@param job: the changed job
+ @warning: call with the job lock held
+
"""
filename = self._GetJobPath(job.id)
data = serializer.DumpJson(job.Serialize(), indent=False)
@@ -1171,6 +1216,17 @@ class JobQueue(object):
logging.debug("Job %s not found", job_id)
return None
+ def _WaitFn(timeout):
+ _big_jqueue_lock.release()
+ try:
+ job.change.wait(timeout)
+ finally:
+ # Reacquire the locks in the correct order.
+ # This should all go when the bjl disappears.
+ job.release()
+ _big_jqueue_lock.acquire(shared=1)
+ job.acquire()
+
def _CheckForChanges():
logging.debug("Waiting for changes in job %s", job_id)
@@ -1198,14 +1254,18 @@ class JobQueue(object):
raise utils.RetryAgain()
+ job.acquire()
try:
- # Setting wait function to release the queue lock while waiting
- return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
- wait_fn=job.change.wait)
- except utils.RetryTimeout:
- return constants.JOB_NOTCHANGED
+ try:
+ # Setting wait function to release the queue lock while waiting
+ return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME,
+ timeout, wait_fn=_WaitFn)
+ except utils.RetryTimeout:
+ return constants.JOB_NOTCHANGED
+ finally:
+ job.release()
- @locking.ssynchronized(_big_jqueue_lock)
+ @locking.ssynchronized(_big_jqueue_lock, shared=1)
@_RequireOpenQueue
def CancelJob(self, job_id):
"""Cancels a job.
@@ -1223,29 +1283,33 @@ class JobQueue(object):
logging.debug("Job %s not found", job_id)
return (False, "Job %s not found" % job_id)
- job_status = job.CalcStatus()
-
- if job_status not in (constants.JOB_STATUS_QUEUED,
- constants.JOB_STATUS_WAITLOCK):
- logging.debug("Job %s is no longer waiting in the queue", job.id)
- return (False, "Job %s is no longer waiting in the queue" % job.id)
-
- if job_status == constants.JOB_STATUS_QUEUED:
- self.CancelJobUnlocked(job)
- return (True, "Job %s canceled" % job.id)
-
- elif job_status == constants.JOB_STATUS_WAITLOCK:
- # The worker will notice the new status and cancel the job
- try:
- job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
- finally:
- self.UpdateJobUnlocked(job)
- return (True, "Job %s will be canceled" % job.id)
+ job.acquire()
+ try:
+ job_status = job.CalcStatus()
+ if job_status not in (constants.JOB_STATUS_QUEUED,
+ constants.JOB_STATUS_WAITLOCK):
+ logging.debug("Job %s is no longer waiting in the queue", job.id)
+ return (False, "Job %s is no longer waiting in the queue" % job.id)
+
+ if job_status == constants.JOB_STATUS_QUEUED:
+ self.CancelJobUnlocked(job)
+ return (True, "Job %s canceled" % job.id)
+ elif job_status == constants.JOB_STATUS_WAITLOCK:
+ # The worker will notice the new status and cancel the job
+ try:
+ job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
+ finally:
+ self.UpdateJobUnlocked(job)
+ return (True, "Job %s will be canceled" % job.id)
+ finally:
+ job.release()
@_RequireOpenQueue
def CancelJobUnlocked(self, job):
"""Marks a job as canceled.
+ @warning: call with the job lock held
+
"""
try:
job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
--
1.7.1