Each job has now its own separate lock, rather than being protected by
the job queue one. Interesting changes are:
  - WaitForJobChanges now has to release both the shared jqueue lock and
    the job lock (which is done automatically by the wait call) and then
    reacquire them in the correct order.
  - Callers of UpdateJobUnlocked must hold the job lock or the
    notification will fail.
  - Some acquire of the bjl can now be shared.

Signed-off-by: Guido Trotter <[email protected]>
---
 lib/jqueue.py |  222 ++++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 143 insertions(+), 79 deletions(-)

diff --git a/lib/jqueue.py b/lib/jqueue.py
index 81e2e40..ed9cabe 100644
--- a/lib/jqueue.py
+++ b/lib/jqueue.py
@@ -168,7 +168,7 @@ class _QueuedJob(object):
   # pylint: disable-msg=W0212
   __slots__ = ["queue", "id", "ops", "log_serial",
                "received_timestamp", "start_timestamp", "end_timestamp",
-               "lock_status", "change",
+               "lock_status", "change", "_lock", "acquire", "release",
                "__weakref__"]
 
   def __init__(self, queue, job_id, ops):
@@ -197,8 +197,12 @@ class _QueuedJob(object):
     # In-memory attributes
     self.lock_status = None
 
+    self._lock = threading.Lock()
+    self.acquire = self._lock.acquire
+    self.release = self._lock.release
+
     # Condition to wait for changes
-    self.change = locking.PipeCondition(_big_jqueue_lock)
+    self.change = locking.PipeCondition(self._lock)
 
   def __repr__(self):
     status = ["%s.%s" % (self.__class__.__module__, self.__class__.__name__),
@@ -237,8 +241,12 @@ class _QueuedJob(object):
         obj.log_serial = max(obj.log_serial, log_entry[0])
       obj.ops.append(op)
 
+    obj._lock = threading.Lock()
+    obj.acquire = obj._lock.acquire
+    obj.release = obj._lock.release
+
     # Condition to wait for changes
-    obj.change = locking.PipeCondition(_big_jqueue_lock)
+    obj.change = locking.PipeCondition(obj._lock)
 
     return obj
 
@@ -383,27 +391,35 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
     Processor.ExecOpCode) set to OP_STATUS_WAITLOCK.
 
     """
-    assert self._op.status in (constants.OP_STATUS_WAITLOCK,
-                               constants.OP_STATUS_CANCELING)
+    self._job.acquire()
+    try:
+      assert self._op.status in (constants.OP_STATUS_WAITLOCK,
+                                 constants.OP_STATUS_CANCELING)
 
-    # All locks are acquired by now
-    self._job.lock_status = None
+      # All locks are acquired by now
+      self._job.lock_status = None
 
-    # Cancel here if we were asked to
-    if self._op.status == constants.OP_STATUS_CANCELING:
-      raise CancelJob()
+      # Cancel here if we were asked to
+      if self._op.status == constants.OP_STATUS_CANCELING:
+        raise CancelJob()
 
-    self._op.status = constants.OP_STATUS_RUNNING
-    self._op.exec_timestamp = TimeStampNow()
+      self._op.status = constants.OP_STATUS_RUNNING
+      self._op.exec_timestamp = TimeStampNow()
+    finally:
+      self._job.release()
 
-  @locking.ssynchronized(_big_jqueue_lock)
+  @locking.ssynchronized(_big_jqueue_lock, shared=1)
   def _AppendFeedback(self, timestamp, log_type, log_msg):
     """Internal feedback append function, with locks
 
     """
-    self._job.log_serial += 1
-    self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
-    self._job.change.notifyAll()
+    self._job.acquire()
+    try:
+      self._job.log_serial += 1
+      self._op.log.append((self._job.log_serial, timestamp, log_type, log_msg))
+      self._job.change.notifyAll()
+    finally:
+      self._job.release()
 
   def Feedback(self, *args):
     """Append a log entry.
@@ -467,19 +483,23 @@ class _JobQueueWorker(workerpool.BaseWorker):
             logging.info("Op %s/%s: Starting opcode %s", idx + 1, count,
                          op_summary)
 
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
-              if op.status == constants.OP_STATUS_CANCELED:
-                raise CancelJob()
-              assert op.status == constants.OP_STATUS_QUEUED
-              op.status = constants.OP_STATUS_WAITLOCK
-              op.result = None
-              op.start_timestamp = TimeStampNow()
-              if idx == 0: # first opcode
-                job.start_timestamp = op.start_timestamp
-              queue.UpdateJobUnlocked(job)
-
-              input_opcode = op.input
+              job.acquire()
+              try:
+                if op.status == constants.OP_STATUS_CANCELED:
+                  raise CancelJob()
+                assert op.status == constants.OP_STATUS_QUEUED
+                op.status = constants.OP_STATUS_WAITLOCK
+                op.result = None
+                op.start_timestamp = TimeStampNow()
+                if idx == 0: # first opcode
+                  job.start_timestamp = op.start_timestamp
+                queue.UpdateJobUnlocked(job)
+
+                input_opcode = op.input
+              finally:
+                job.release()
             finally:
               queue.release()
 
@@ -487,12 +507,16 @@ class _JobQueueWorker(workerpool.BaseWorker):
             result = proc.ExecOpCode(input_opcode,
                                      _OpExecCallbacks(queue, job, op))
 
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
-              op.status = constants.OP_STATUS_SUCCESS
-              op.result = result
-              op.end_timestamp = TimeStampNow()
-              queue.UpdateJobUnlocked(job)
+              job.acquire()
+              try:
+                op.status = constants.OP_STATUS_SUCCESS
+                op.result = result
+                op.end_timestamp = TimeStampNow()
+                queue.UpdateJobUnlocked(job)
+              finally:
+                job.release()
             finally:
               queue.release()
 
@@ -502,27 +526,35 @@ class _JobQueueWorker(workerpool.BaseWorker):
             # Will be handled further up
             raise
           except Exception, err:
-            queue.acquire()
+            queue.acquire(shared=1)
             try:
+              job.acquire()
               try:
-                op.status = constants.OP_STATUS_ERROR
-                if isinstance(err, errors.GenericError):
-                  op.result = errors.EncodeException(err)
-                else:
-                  op.result = str(err)
-                op.end_timestamp = TimeStampNow()
-                logging.info("Op %s/%s: Error in opcode %s: %s",
-                             idx + 1, count, op_summary, err)
+                try:
+                  op.status = constants.OP_STATUS_ERROR
+                  if isinstance(err, errors.GenericError):
+                    op.result = errors.EncodeException(err)
+                  else:
+                    op.result = str(err)
+                  op.end_timestamp = TimeStampNow()
+                  logging.info("Op %s/%s: Error in opcode %s: %s",
+                               idx + 1, count, op_summary, err)
+                finally:
+                  queue.UpdateJobUnlocked(job)
               finally:
-                queue.UpdateJobUnlocked(job)
+                job.release()
             finally:
               queue.release()
             raise
 
       except CancelJob:
-        queue.acquire()
+        queue.acquire(shared=1)
         try:
-          queue.CancelJobUnlocked(job)
+          job.acquire()
+          try:
+            queue.CancelJobUnlocked(job)
+          finally:
+            job.release()
         finally:
           queue.release()
       except errors.GenericError, err:
@@ -530,15 +562,19 @@ class _JobQueueWorker(workerpool.BaseWorker):
       except:
         logging.exception("Unhandled exception")
     finally:
-      queue.acquire()
+      queue.acquire(shared=1)
       try:
+        job.acquire()
         try:
-          job.lock_status = None
-          job.end_timestamp = TimeStampNow()
-          queue.UpdateJobUnlocked(job)
+          try:
+            job.lock_status = None
+            job.end_timestamp = TimeStampNow()
+            queue.UpdateJobUnlocked(job)
+          finally:
+            job_id = job.id
+            status = job.CalcStatus()
         finally:
-          job_id = job.id
-          status = job.CalcStatus()
+          job.release()
       finally:
         queue.release()
 
@@ -675,7 +711,11 @@ class JobQueue(object):
               job.MarkUnfinishedOps(constants.OP_STATUS_ERROR,
                                     "Unclean master daemon shutdown")
             finally:
-              self.UpdateJobUnlocked(job)
+              job.acquire()
+              try:
+                self.UpdateJobUnlocked(job)
+              finally:
+                job.release()
 
         logging.info("Job queue inspection finished")
       finally:
@@ -1070,9 +1110,12 @@ class JobQueue(object):
       raise errors.JobQueueFull()
 
     job = _QueuedJob(self, job_id, ops)
-
-    # Write to disk
-    self.UpdateJobUnlocked(job)
+    job.acquire()
+    try:
+      # Write to disk
+      self.UpdateJobUnlocked(job)
+    finally:
+      job.release()
 
     self._size_lock.acquire()
     try:
@@ -1131,6 +1174,8 @@ class JobQueue(object):
     @type job: L{_QueuedJob}
     @param job: the changed job
 
+    @warning: call with the job lock held
+
     """
     filename = self._GetJobPath(job.id)
     data = serializer.DumpJson(job.Serialize(), indent=False)
@@ -1171,6 +1216,17 @@ class JobQueue(object):
       logging.debug("Job %s not found", job_id)
       return None
 
+    def _WaitFn(timeout):
+      _big_jqueue_lock.release()
+      try:
+        job.change.wait(timeout)
+      finally:
+        # Reacquire the locks in the correct order.
+        # This should all go when the bjl disappears.
+        job.release()
+        _big_jqueue_lock.acquire(shared=1)
+        job.acquire()
+
     def _CheckForChanges():
       logging.debug("Waiting for changes in job %s", job_id)
 
@@ -1198,14 +1254,18 @@ class JobQueue(object):
 
       raise utils.RetryAgain()
 
+    job.acquire()
     try:
-      # Setting wait function to release the queue lock while waiting
-      return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME, timeout,
-                         wait_fn=job.change.wait)
-    except utils.RetryTimeout:
-      return constants.JOB_NOTCHANGED
+      try:
+        # Setting wait function to release the queue lock while waiting
+        return utils.Retry(_CheckForChanges, utils.RETRY_REMAINING_TIME,
+                           timeout, wait_fn=_WaitFn)
+      except utils.RetryTimeout:
+        return constants.JOB_NOTCHANGED
+    finally:
+      job.release()
 
-  @locking.ssynchronized(_big_jqueue_lock)
+  @locking.ssynchronized(_big_jqueue_lock, shared=1)
   @_RequireOpenQueue
   def CancelJob(self, job_id):
     """Cancels a job.
@@ -1223,29 +1283,33 @@ class JobQueue(object):
       logging.debug("Job %s not found", job_id)
       return (False, "Job %s not found" % job_id)
 
-    job_status = job.CalcStatus()
-
-    if job_status not in (constants.JOB_STATUS_QUEUED,
-                          constants.JOB_STATUS_WAITLOCK):
-      logging.debug("Job %s is no longer waiting in the queue", job.id)
-      return (False, "Job %s is no longer waiting in the queue" % job.id)
-
-    if job_status == constants.JOB_STATUS_QUEUED:
-      self.CancelJobUnlocked(job)
-      return (True, "Job %s canceled" % job.id)
-
-    elif job_status == constants.JOB_STATUS_WAITLOCK:
-      # The worker will notice the new status and cancel the job
-      try:
-        job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
-      finally:
-        self.UpdateJobUnlocked(job)
-      return (True, "Job %s will be canceled" % job.id)
+    job.acquire()
+    try:
+      job_status = job.CalcStatus()
+      if job_status not in (constants.JOB_STATUS_QUEUED,
+                            constants.JOB_STATUS_WAITLOCK):
+        logging.debug("Job %s is no longer waiting in the queue", job.id)
+        return (False, "Job %s is no longer waiting in the queue" % job.id)
+
+      if job_status == constants.JOB_STATUS_QUEUED:
+        self.CancelJobUnlocked(job)
+        return (True, "Job %s canceled" % job.id)
+      elif job_status == constants.JOB_STATUS_WAITLOCK:
+        # The worker will notice the new status and cancel the job
+        try:
+          job.MarkUnfinishedOps(constants.OP_STATUS_CANCELING, None)
+        finally:
+          self.UpdateJobUnlocked(job)
+        return (True, "Job %s will be canceled" % job.id)
+    finally:
+      job.release()
 
   @_RequireOpenQueue
   def CancelJobUnlocked(self, job):
     """Marks a job as canceled.
 
+    @warning: call with the job lock held
+
     """
     try:
       job.MarkUnfinishedOps(constants.OP_STATUS_CANCELED,
-- 
1.7.1

Reply via email to