Change lock allocation in mcpu to honor the required minimum of locks to allocate in opportunistic lock allocation.
Signed-off-by: Klaus Aehlig <[email protected]> Reviewed-by: Hrvoje Ribicic <[email protected]> Cherry-picked from f3f1fc574671d41fefb05661b01d4ff93312eef7. Signed-off-by: Klaus Aehlig <[email protected]> --- lib/mcpu.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/lib/mcpu.py b/lib/mcpu.py index 7e061cd..f08c3c9 100644 --- a/lib/mcpu.py +++ b/lib/mcpu.py @@ -335,7 +335,8 @@ class Processor(object): if not self._enable_locks: raise errors.ProgrammerError("Attempted to use disabled locks") - def _AcquireLocks(self, level, names, shared, opportunistic, timeout): + def _AcquireLocks(self, level, names, shared, opportunistic, timeout, + opportunistic_count=1): """Acquires locks via the Ganeti lock manager. @type level: int @@ -396,12 +397,10 @@ class Processor(object): else: request = [[lock, "exclusive"] for lock in locks] - if opportunistic: - logging.debug("Opportunistically acquring some of %s for %s.", - locks, self._wconfdcontext) - locks = self.wconfd.Client().OpportunisticLockUnion(self._wconfdcontext, - request) - elif timeout is None: + if timeout is None: + ## Note: once we are so desperate for locks to request them + ## unconditionally, we no longer care about an original plan + ## to acquire locks opportunistically. logging.info("Definitely requesting %s for %s", request, self._wconfdcontext) ## The only way to be sure of not getting starved is to sequentially @@ -415,6 +414,17 @@ class Processor(object): if not pending: break time.sleep(10.0 * random.random()) + + elif opportunistic: + logging.debug("For %ss trying to opportunistically acquire" + " at least %d of %s for %s.", + timeout, opportunistic_count, locks, self._wconfdcontext) + locks = utils.SimpleRetry( + lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion, + 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request]) + logging.debug("Managed to get the following locks: %s", locks) + if locks == []: + raise LockAcquireTimeout() else: logging.debug("Trying %ss to request %s for %s", timeout, request, self._wconfdcontext) @@ -538,6 +548,7 @@ class Processor(object): lu.DeclareLocks(level) share = lu.share_locks[level] opportunistic = lu.opportunistic_locks[level] + opportunistic_count = lu.opportunistic_locks_count[level] try: assert adding_locks ^ acquiring_locks, \ @@ -553,7 +564,8 @@ class Processor(object): use_opportunistic = False self._AcquireLocks(level, needed_locks, share, use_opportunistic, - calc_timeout()) + calc_timeout(), + opportunistic_count=opportunistic_count) lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext) result = self._LockAndExecLU(lu, level + 1, calc_timeout) -- 2.0.0.526.g5318336
