LGTM, thanks.
On Mon, Feb 24, 2014 at 2:33 PM, Hrvoje Ribicic <[email protected]> wrote: > This should improve the situation: > > diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py > index 1207e68..fc068e2 100644 > --- a/qa/qa_job_utils.py > +++ b/qa/qa_job_utils.py > @@ -125,7 +125,7 @@ def _GetNodeUUIDMap(nodes): > > > def _FindLockNames(locks): > - """ Finds a mapping of internal lock names to the names of entities > locked. > + """ Finds the ids and descriptions of locks that given locks can block. > > @type locks: dict of locking level to list > @param locks: The locks that gnt-debug delay is holding. > @@ -133,6 +133,10 @@ def _FindLockNames(locks): > @rtype: dict of string to string > @return: The lock name to entity name map. > > + For a given set of locks, some internal locks (e.g. ALL_SET locks) can > be > + blocked even though they were not listed explicitly. This function has > to take > + care and list all locks that can be blocked by the locks given as > parameters. > + > """ > lock_map = {} > > @@ -147,8 +151,8 @@ def _FindLockNames(locks): > for name in name_uuid_map: > lock_map["node/%s" % name_uuid_map[name]] = name > > - # With locking.ALL_SET being defined as None, only node_locks does not > - # suffice > + # If ALL_SET was requested explicitly, or there is at least one lock > + # Note that locking.ALL_SET is None and hence the strange form of the > if > if node_locks == locking.ALL_SET or node_locks: > lock_map["node/[lockset]"] = "joint node lock" > > > > > On Mon, Feb 24, 2014 at 12:46 PM, Petr Pudlák <[email protected]> wrote: > >> >> >> >> On Tue, Feb 18, 2014 at 3:39 PM, Hrvoje Ribicic <[email protected]> wrote: >> >>> This patch adds threading to the RunWithTests function, allowing one >>> thread to execute the QA test, and the other to monitor if it is being >>> blocked by locks set up during the test. If it is, terminate the >>> blocking job, and let the QA continue, reporting the test failure at >>> the very end. >>> >>> Signed-off-by: Hrvoje Ribicic <[email protected]> >>> --- >>> qa/qa_job_utils.py | 130 >>> +++++++++++++++++++++++++++++++++++++++++++++++++---- >>> 1 file changed, 122 insertions(+), 8 deletions(-) >>> >>> diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py >>> index bc4733c..03f8ee1 100644 >>> --- a/qa/qa_job_utils.py >>> +++ b/qa/qa_job_utils.py >>> @@ -24,6 +24,8 @@ >>> """ >>> >>> import re >>> +import threading >>> +import time >>> >>> from ganeti import constants >>> from ganeti import locking >>> @@ -38,7 +40,7 @@ from qa_utils import AssertCommand, GetCommandOutput, >>> GetObjectInfo >>> AVAILABLE_LOCKS = [locking.LEVEL_NODE, ] >>> >>> >>> -def _GetOutputFromMaster(cmd): >>> +def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True): >>> """ Gets the output of a command executed on master. >>> >>> """ >>> @@ -51,7 +53,8 @@ def _GetOutputFromMaster(cmd): >>> # buildbot >>> cmdstr += " 2>&1" >>> >>> - return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr) >>> + return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr, >>> + use_multiplexer=use_multiplexer, >>> log_cmd=log_cmd) >>> >>> >>> def ExecuteJobProducingCommand(cmd): >>> @@ -107,6 +110,83 @@ def _TerminateDelayFunction(termination_socket): >>> AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % >>> termination_socket) >>> >>> >>> +def _GetNodeUUIDMap(nodes): >>> + """ Given a list of nodes, retrieves a mapping of their names to >>> UUIDs. >>> + >>> + @type nodes: list of string >>> + @param nodes: The nodes to retrieve a map for. If empty, returns >>> information >>> + for all the nodes. >>> + >>> + """ >>> + cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"] >>> + cmd.extend(nodes) >>> + output = _GetOutputFromMaster(cmd) >>> + return dict(map(lambda x: x.split(), output.splitlines())) >>> + >>> + >>> >> >> As discussed offline, it'd be good to document this function in more >> detail so that it's easier to add more locks later. Perhaps there could be >> a comment section at the start of the module describing what's necessary to >> add another type of locks. >> >> >>> +def _FindLockNames(locks): >>> + """ Finds a mapping of internal lock names to the names of entities >>> locked. >>> + >>> + @type locks: dict of locking level to list >>> + @param locks: The locks that gnt-debug delay is holding. >>> + >>> + @rtype: dict of string to string >>> + @return: The lock name to entity name map. >>> + >>> + """ >>> + lock_map = {} >>> + >>> + if locking.LEVEL_NODE in locks: >>> + node_locks = locks[locking.LEVEL_NODE] >>> + if node_locks == locking.ALL_SET: >>> + # Empty list retrieves all info >>> + name_uuid_map = _GetNodeUUIDMap([]) >>> + else: >>> + name_uuid_map = _GetNodeUUIDMap(node_locks) >>> + >>> + for name in name_uuid_map: >>> + lock_map["node/%s" % name_uuid_map[name]] = name >>> + >>> + # With locking.ALL_SET being defined as None, only node_locks does >>> not >>> + # suffice >>> + if node_locks == locking.ALL_SET or node_locks: >>> + lock_map["node/[lockset]"] = "joint node lock" >>> + >>> + #TODO add other lock types here when support for these is added >>> + return lock_map >>> + >>> + >>> +def _GetBlockingLocks(): >>> + """ Finds out which locks are blocking jobs by invoking "gnt-debug >>> locks". >>> + >>> + @rtype: list of string >>> + @return: The names of the locks currently blocking any job. >>> + >>> + """ >>> + # Due to mysterious issues when a SSH multiplexer is being used by two >>> + # threads, we turn it off, and block most of the logging to improve >>> the >>> + # visibility of the other thread's output >>> + locks_output = _GetOutputFromMaster("gnt-debug locks", >>> use_multiplexer=False, >>> + log_cmd=False) >>> + >>> + # The first non-empty line is the header, which we do not need >>> + lock_lines = locks_output.splitlines()[1:] >>> + >>> + blocking_locks = [] >>> + for lock_line in lock_lines: >>> + components = lock_line.split() >>> + if len(components) != 4: >>> + raise qa_error.Error("Error while parsing gnt-debug locks output, >>> " >>> + "line at fault is: %s" % lock_line) >>> + >>> + lock_name, _, _, pending_jobs = components >>> + >>> + if pending_jobs != '-': >>> + blocking_locks.append(lock_name) >>> + >>> + return blocking_locks >>> + >>> + >>> # TODO: Can this be done as a decorator? Implement as needed. >>> def RunWithLocks(fn, locks, timeout, *args, **kwargs): >>> """ Runs the given function, acquiring a set of locks beforehand. >>> @@ -123,10 +203,11 @@ def RunWithLocks(fn, locks, timeout, *args, >>> **kwargs): >>> test, to try and see if the function can run in parallel with other >>> operations. >>> >>> - The current version simply creates the locks, which expire after a >>> given >>> - timeout, and attempts to invoke the provided function. >>> - >>> - This will probably block the QA, and future versions will address >>> this. >>> + Locks are acquired by invoking a gnt-debug delay operation which can >>> be >>> + interrupted as needed. The QA test is then run in a separate thread, >>> with the >>> + current thread observing jobs waiting for locks. When a job is >>> spotted waiting >>> + for a lock held by the started delay operation, this is noted, and >>> the delay >>> + is interrupted, allowing the QA test to continue. >>> >>> A default timeout is not provided by design - the test creator must >>> make a >>> good conservative estimate. >>> @@ -139,11 +220,44 @@ def RunWithLocks(fn, locks, timeout, *args, >>> **kwargs): >>> # The watcher may interfere by issuing its own jobs - therefore pause >>> it >>> AssertCommand(["gnt-cluster", "watcher", "pause", "12h"]) >>> >>> + # Find out the lock names prior to starting the delay function >>> + lock_name_map = _FindLockNames(locks) >>> + >>> termination_socket = _StartDelayFunction(locks, timeout) >>> >>> - fn(*args, **kwargs) >>> + qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs) >>> + qa_thread.start() >>> >>> - _TerminateDelayFunction(termination_socket) >>> + blocking_owned_locks = [] >>> + test_blocked = False >>> + >>> + try: >>> + while qa_thread.isAlive(): >>> + blocking_locks = _GetBlockingLocks() >>> + blocking_owned_locks = \ >>> + set(blocking_locks).intersection(set(lock_name_map)) >>> + >>> + if blocking_owned_locks: >>> + test_blocked = True >>> + _TerminateDelayFunction(termination_socket) >>> + break >>> + >>> + # The sleeping time has been set arbitrarily >>> + time.sleep(5) >>> + except: >>> + # If anything goes wrong here, we should be responsible and >>> terminate the >>> + # delay job >>> + _TerminateDelayFunction(termination_socket) >>> + raise >>> >> + >>> + qa_thread.join() >>> + >>> + if test_blocked: >>> + blocking_lock_names = map(lock_name_map.get, blocking_owned_locks) >>> + raise qa_error.Error("QA test succeded, but was blocked by the >>> locks: %s" % >>> + ", ".join(blocking_lock_names)) >>> + else: >>> + _TerminateDelayFunction(termination_socket) >>> >>> # Revive the watcher >>> AssertCommand(["gnt-cluster", "watcher", "continue"]) >>> -- >>> 1.9.0.rc1.175.g0b1dcb5 >>> >>> >> LGTM, thanks. >> > >
