...as in a single-threaded process there are no conflicts.
Signed-off-by: Klaus Aehlig <[email protected]>
---
lib/jqueue/__init__.py | 34 +---------------------------------
test/py/ganeti.jqueue_unittest.py | 37 -------------------------------------
2 files changed, 1 insertion(+), 70 deletions(-)
diff --git a/lib/jqueue/__init__.py b/lib/jqueue/__init__.py
index 0aa9665..286f929 100644
--- a/lib/jqueue/__init__.py
+++ b/lib/jqueue/__init__.py
@@ -30,9 +30,6 @@
"""Module implementing the job queue handling.
-Locking: there's a single, large lock in the L{JobQueue} class. It's
-used by all other classes in this module.
-
"""
import logging
@@ -73,10 +70,6 @@ from ganeti import vcluster
from ganeti.cmdlib import cluster
-# member lock names to be passed to @ssynchronized decorator
-_LOCK = "_lock"
-_QUEUE = "_queue"
-
#: Retrieves "id" attribute
_GetIdAttr = operator.attrgetter("id")
@@ -578,7 +571,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
logging.debug("Canceling opcode")
raise CancelJob()
- @locking.ssynchronized(_QUEUE, shared=1)
def NotifyStart(self):
"""Mark the opcode as running, not lock-waiting.
@@ -603,7 +595,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
# And finally replicate the job status
self._queue.UpdateJobUnlocked(self._job)
- @locking.ssynchronized(_QUEUE, shared=1)
def NotifyRetry(self):
"""Mark opcode again as lock-waiting.
@@ -614,7 +605,6 @@ class _OpExecCallbacks(mcpu.OpExecCbBase):
self._op.status = constants.OP_STATUS_WAITING
logging.debug("Opcode will be retried. Back to waiting.")
- @locking.ssynchronized(_QUEUE, shared=1)
def _AppendFeedback(self, timestamp, log_type, log_msg):
"""Internal feedback append function, with locks
@@ -966,7 +956,6 @@ class _JobProcessor(object):
logging.debug("Processing job %s", job.id)
- queue.acquire(shared=1)
try:
opcount = len(job.ops)
@@ -1029,11 +1018,7 @@ class _JobProcessor(object):
assert not opctx.jobdeps, "Not all dependencies were removed"
- queue.release()
- try:
- (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
- finally:
- queue.acquire(shared=1)
+ (op_status, op_result) = self._ExecOpCodeUnlocked(opctx)
op.status = op_status
op.result = op_result
@@ -1138,7 +1123,6 @@ class _JobProcessor(object):
return self.DEFER
finally:
assert job.writable, "Job became read-only while being processed"
- queue.release()
class _JobDependencyManager:
@@ -1158,9 +1142,7 @@ class _JobDependencyManager:
self._getstatus_fn = getstatus_fn
self._waiters = {}
- self._lock = locking.SharedLock("JobDepMgr")
- @locking.ssynchronized(_LOCK, shared=1)
def JobWaiting(self, job):
"""Checks if a job is waiting.
@@ -1168,7 +1150,6 @@ class _JobDependencyManager:
return compat.any(job in jobs
for jobs in self._waiters.values())
- @locking.ssynchronized(_LOCK)
def CheckAndRegister(self, job, dep_job_id, dep_status):
"""Checks if a dependency job has the requested status.
@@ -1257,16 +1238,6 @@ class JobQueue(object):
self._memcache = weakref.WeakValueDictionary()
self._my_hostname = netutils.Hostname.GetSysName()
- # The Big JobQueue lock. If a code block or method acquires it in shared
- # mode safe it must guarantee concurrency with all the code acquiring it in
- # shared mode, including itself. In order not to acquire it at all
- # concurrency must be guaranteed with all code acquiring it in shared mode
- # and all code acquiring it exclusively.
- self._lock = locking.SharedLock("JobQueue")
-
- self.acquire = self._lock.acquire
- self.release = self._lock.release
-
# Get initial list of nodes
self._nodes = dict((n.name, n.primary_ip)
for n in cfg.GetAllNodesInfo().values()
@@ -1290,7 +1261,6 @@ class JobQueue(object):
"""
return rpc.JobQueueRunner(self.context, address_list)
- @locking.ssynchronized(_LOCK)
def AddNode(self, node):
"""Register a new node with the queue.
@@ -1730,7 +1700,6 @@ class JobQueue(object):
else:
return None
- @locking.ssynchronized(_LOCK)
def CancelJob(self, job_id):
"""Cancels a job.
@@ -1744,7 +1713,6 @@ class JobQueue(object):
return self._ModifyJobUnlocked(job_id, lambda job: job.Cancel())
- @locking.ssynchronized(_LOCK)
def ChangeJobPriority(self, job_id, priority):
"""Changes a job's priority.
diff --git a/test/py/ganeti.jqueue_unittest.py
b/test/py/ganeti.jqueue_unittest.py
index c13749d..c258f65 100755
--- a/test/py/ganeti.jqueue_unittest.py
+++ b/test/py/ganeti.jqueue_unittest.py
@@ -516,7 +516,6 @@ class _DisabledFakeDependencyManager:
class _FakeQueueForProc:
def __init__(self, depmgr=None):
- self._acquired = False
self._updates = []
self._submitted = []
@@ -527,29 +526,16 @@ class _FakeQueueForProc:
else:
self.depmgr = _DisabledFakeDependencyManager()
- def IsAcquired(self):
- return self._acquired
-
def GetNextUpdate(self):
return self._updates.pop(0)
def GetNextSubmittedJob(self):
return self._submitted.pop(0)
- def acquire(self, shared=0):
- assert shared == 1
- self._acquired = True
-
- def release(self):
- assert self._acquired
- self._acquired = False
-
def UpdateJobUnlocked(self, job, replicate=True):
- assert self._acquired, "Lock not acquired while updating job"
self._updates.append((job, bool(replicate)))
def SubmitManyJobs(self, jobs):
- assert not self._acquired, "Lock acquired while submitting jobs"
job_ids = [self._submit_count.next() for _ in jobs]
self._submitted.extend(zip(job_ids, jobs))
return job_ids
@@ -563,8 +549,6 @@ class _FakeExecOpCodeForProc:
def __call__(self, op, cbs, timeout=None):
assert isinstance(op, opcodes.OpTestDummy)
- assert not self._queue.IsAcquired(), \
- "Queue lock not released when executing opcode"
if self._before_start:
self._before_start(timeout, cbs.CurrentPriority())
@@ -574,9 +558,6 @@ class _FakeExecOpCodeForProc:
if self._after_start:
self._after_start(op, cbs)
- # Check again after the callbacks
- assert not self._queue.IsAcquired()
-
if op.fail:
raise errors.OpExecError("Error requested (%s)" % op.result)
@@ -620,7 +601,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
@@ -628,7 +608,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
@@ -820,7 +799,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelled
@@ -834,7 +812,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
opexec = _FakeExecOpCodeForProc(queue, _BeforeStart, _AfterStart)
@@ -865,7 +842,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_QUEUED)
def _BeforeStart(timeout, priority):
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
# Mark as cancelled
@@ -1049,13 +1025,11 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
def _AfterStart(op, cbs):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertRaises(AssertionError, cbs.Feedback,
@@ -1140,7 +1114,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
def _BeforeStart(timeout, priority):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
@@ -1148,7 +1121,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
@@ -1217,7 +1189,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
# Job should only be updated when it wasn't waiting for another job
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
@@ -1225,7 +1196,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
@@ -1336,7 +1306,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
# Job should only be updated when it wasn't waiting for another job
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
@@ -1344,7 +1313,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
@@ -1445,7 +1413,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
# Job should only be updated when it wasn't waiting for another job
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
self.assertFalse(job.cur_opctx)
@@ -1453,7 +1420,6 @@ class TestJobProcessor(unittest.TestCase,
_JobProcessorTestUtils):
self.assertEqual(queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, queue.GetNextUpdate)
- self.assertFalse(queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
self.assertFalse(job.cur_opctx)
@@ -1570,7 +1536,6 @@ class TestJobProcessorTimeouts(unittest.TestCase,
_JobProcessorTestUtils):
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
- self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_WAITING)
ts = self.timeout_strategy
@@ -1608,7 +1573,6 @@ class TestJobProcessorTimeouts(unittest.TestCase,
_JobProcessorTestUtils):
self.assertEqual(self.queue.GetNextUpdate(), (job, True))
self.assertRaises(IndexError, self.queue.GetNextUpdate)
- self.assertFalse(self.queue.IsAcquired())
self.assertEqual(job.CalcStatus(), constants.JOB_STATUS_RUNNING)
# Job is running, cancelling shouldn't be possible
@@ -1765,7 +1729,6 @@ class TestJobProcessorChangePriority(unittest.TestCase,
_JobProcessorTestUtils):
self.opexecprio = []
def _BeforeStart(self, timeout, priority):
- self.assertFalse(self.queue.IsAcquired())
self.opexecprio.append(priority)
def testChangePriorityWhileRunning(self):
--
2.2.0.rc0.207.ga3a616c