LGTM, thanks
On Thu, Apr 17, 2014 at 8:52 AM, Thomas Thrainer <[email protected]>wrote: > This patch adds threading to the RunWithTests function, allowing one > thread to execute the QA test, and the other to monitor if it is being > blocked by locks set up during the test. If it is, terminate the > blocking job, and let the QA continue, reporting the test failure at > the very end. > > Signed-off-by: Hrvoje Ribicic <[email protected]> > Reviewed-by: Petr Pudlak <[email protected]> > Signed-off-by: Thomas Thrainer <[email protected]> > > (cherry picked from commit 34c5ec6c78aa164eebb4ad6cfc14b38e81aad8ec) > Signed-off-by: Thomas Thrainer <[email protected]> > --- > qa/qa_job_utils.py | 136 > +++++++++++++++++++++++++++++++++++++++++++++++++---- > 1 file changed, 127 insertions(+), 9 deletions(-) > > diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py > index 56e4b2e..fc068e2 100644 > --- a/qa/qa_job_utils.py > +++ b/qa/qa_job_utils.py > @@ -24,6 +24,8 @@ > """ > > import re > +import threading > +import time > > from ganeti import constants > from ganeti import locking > @@ -38,7 +40,7 @@ from qa_utils import AssertCommand, GetCommandOutput, > GetObjectInfo > AVAILABLE_LOCKS = [locking.LEVEL_NODE, ] > > > -def _GetOutputFromMaster(cmd): > +def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True): > """ Gets the output of a command executed on master. > > """ > @@ -51,7 +53,8 @@ def _GetOutputFromMaster(cmd): > # buildbot > cmdstr += " 2>&1" > > - return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr) > + return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr, > + use_multiplexer=use_multiplexer, > log_cmd=log_cmd) > > > def ExecuteJobProducingCommand(cmd): > @@ -107,6 +110,87 @@ def _TerminateDelayFunction(termination_socket): > AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" % > termination_socket) > > > +def _GetNodeUUIDMap(nodes): > + """ Given a list of nodes, retrieves a mapping of their names to UUIDs. > + > + @type nodes: list of string > + @param nodes: The nodes to retrieve a map for. If empty, returns > information > + for all the nodes. > + > + """ > + cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"] > + cmd.extend(nodes) > + output = _GetOutputFromMaster(cmd) > + return dict(map(lambda x: x.split(), output.splitlines())) > + > + > +def _FindLockNames(locks): > + """ Finds the ids and descriptions of locks that given locks can block. > + > + @type locks: dict of locking level to list > + @param locks: The locks that gnt-debug delay is holding. > + > + @rtype: dict of string to string > + @return: The lock name to entity name map. > + > + For a given set of locks, some internal locks (e.g. ALL_SET locks) can > be > + blocked even though they were not listed explicitly. This function has > to take > + care and list all locks that can be blocked by the locks given as > parameters. > + > + """ > + lock_map = {} > + > + if locking.LEVEL_NODE in locks: > + node_locks = locks[locking.LEVEL_NODE] > + if node_locks == locking.ALL_SET: > + # Empty list retrieves all info > + name_uuid_map = _GetNodeUUIDMap([]) > + else: > + name_uuid_map = _GetNodeUUIDMap(node_locks) > + > + for name in name_uuid_map: > + lock_map["node/%s" % name_uuid_map[name]] = name > + > + # If ALL_SET was requested explicitly, or there is at least one lock > + # Note that locking.ALL_SET is None and hence the strange form of the > if > + if node_locks == locking.ALL_SET or node_locks: > + lock_map["node/[lockset]"] = "joint node lock" > + > + #TODO add other lock types here when support for these is added > + return lock_map > + > + > +def _GetBlockingLocks(): > + """ Finds out which locks are blocking jobs by invoking "gnt-debug > locks". > + > + @rtype: list of string > + @return: The names of the locks currently blocking any job. > + > + """ > + # Due to mysterious issues when a SSH multiplexer is being used by two > + # threads, we turn it off, and block most of the logging to improve the > + # visibility of the other thread's output > + locks_output = _GetOutputFromMaster("gnt-debug locks", > use_multiplexer=False, > + log_cmd=False) > + > + # The first non-empty line is the header, which we do not need > + lock_lines = locks_output.splitlines()[1:] > + > + blocking_locks = [] > + for lock_line in lock_lines: > + components = lock_line.split() > + if len(components) != 4: > + raise qa_error.Error("Error while parsing gnt-debug locks output, " > + "line at fault is: %s" % lock_line) > + > + lock_name, _, _, pending_jobs = components > + > + if pending_jobs != '-': > + blocking_locks.append(lock_name) > + > + return blocking_locks > + > + > # TODO: Can this be done as a decorator? Implement as needed. > def RunWithLocks(fn, locks, timeout, *args, **kwargs): > """ Runs the given function, acquiring a set of locks beforehand. > @@ -123,10 +207,11 @@ def RunWithLocks(fn, locks, timeout, *args, > **kwargs): > test, to try and see if the function can run in parallel with other > operations. > > - The current version simply creates the locks, which expire after a given > - timeout, and attempts to invoke the provided function. > - > - This will probably block the QA, and future versions will address this. > + Locks are acquired by invoking a gnt-debug delay operation which can be > + interrupted as needed. The QA test is then run in a separate thread, > with the > + current thread observing jobs waiting for locks. When a job is spotted > waiting > + for a lock held by the started delay operation, this is noted, and the > delay > + is interrupted, allowing the QA test to continue. > > A default timeout is not provided by design - the test creator must > make a > good conservative estimate. > @@ -139,11 +224,44 @@ def RunWithLocks(fn, locks, timeout, *args, > **kwargs): > # The watcher may interfere by issuing its own jobs - therefore pause it > AssertCommand(["gnt-cluster", "watcher", "pause", "12h"]) > > - termination_socket = _StartDelayFunction(locks, timeout) > + # Find out the lock names prior to starting the delay function > + lock_name_map = _FindLockNames(locks) > > - fn(*args, **kwargs) > + termination_socket = _StartDelayFunction(locks, timeout) > > - _TerminateDelayFunction(termination_socket) > + qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs) > + qa_thread.start() > + > + blocking_owned_locks = [] > + test_blocked = False > + > + try: > + while qa_thread.isAlive(): > + blocking_locks = _GetBlockingLocks() > + blocking_owned_locks = \ > + set(blocking_locks).intersection(set(lock_name_map)) > + > + if blocking_owned_locks: > + test_blocked = True > + _TerminateDelayFunction(termination_socket) > + break > + > + # The sleeping time has been set arbitrarily > + time.sleep(5) > + except: > + # If anything goes wrong here, we should be responsible and terminate > the > + # delay job > + _TerminateDelayFunction(termination_socket) > + raise > + > + qa_thread.join() > + > + if test_blocked: > + blocking_lock_names = map(lock_name_map.get, blocking_owned_locks) > + raise qa_error.Error("QA test succeded, but was blocked by the locks: > %s" % > + ", ".join(blocking_lock_names)) > + else: > + _TerminateDelayFunction(termination_socket) > > # Revive the watcher > AssertCommand(["gnt-cluster", "watcher", "continue"]) > -- > 1.9.1.423.g4596e3a > > -- -- Helga Velroyen | Software Engineer | [email protected] | Google Germany GmbH Dienerstr. 12 80331 München Registergericht und -nummer: Hamburg, HRB 86891 Sitz der Gesellschaft: Hamburg Geschäftsführer: Graham Law, Christine Elizabeth Flores
