LGTM, thanks

On Thu, Apr 17, 2014 at 8:52 AM, Thomas Thrainer <[email protected]>wrote:

> This patch adds threading to the RunWithTests function, allowing one
> thread to execute the QA test, and the other to monitor if it is being
> blocked by locks set up during the test. If it is, terminate the
> blocking job, and let the QA continue, reporting the test failure at
> the very end.
>
> Signed-off-by: Hrvoje Ribicic <[email protected]>
> Reviewed-by: Petr Pudlak <[email protected]>
> Signed-off-by: Thomas Thrainer <[email protected]>
>
> (cherry picked from commit 34c5ec6c78aa164eebb4ad6cfc14b38e81aad8ec)
> Signed-off-by: Thomas Thrainer <[email protected]>
> ---
>  qa/qa_job_utils.py | 136
> +++++++++++++++++++++++++++++++++++++++++++++++++----
>  1 file changed, 127 insertions(+), 9 deletions(-)
>
> diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py
> index 56e4b2e..fc068e2 100644
> --- a/qa/qa_job_utils.py
> +++ b/qa/qa_job_utils.py
> @@ -24,6 +24,8 @@
>  """
>
>  import re
> +import threading
> +import time
>
>  from ganeti import constants
>  from ganeti import locking
> @@ -38,7 +40,7 @@ from qa_utils import AssertCommand, GetCommandOutput,
> GetObjectInfo
>  AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]
>
>
> -def _GetOutputFromMaster(cmd):
> +def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
>    """ Gets the output of a command executed on master.
>
>    """
> @@ -51,7 +53,8 @@ def _GetOutputFromMaster(cmd):
>    # buildbot
>    cmdstr += " 2>&1"
>
> -  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr)
> +  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
> +                          use_multiplexer=use_multiplexer,
> log_cmd=log_cmd)
>
>
>  def ExecuteJobProducingCommand(cmd):
> @@ -107,6 +110,87 @@ def _TerminateDelayFunction(termination_socket):
>    AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" %
> termination_socket)
>
>
> +def _GetNodeUUIDMap(nodes):
> +  """ Given a list of nodes, retrieves a mapping of their names to UUIDs.
> +
> +  @type nodes: list of string
> +  @param nodes: The nodes to retrieve a map for. If empty, returns
> information
> +                for all the nodes.
> +
> +  """
> +  cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
> +  cmd.extend(nodes)
> +  output = _GetOutputFromMaster(cmd)
> +  return dict(map(lambda x: x.split(), output.splitlines()))
> +
> +
> +def _FindLockNames(locks):
> +  """ Finds the ids and descriptions of locks that given locks can block.
> +
> +  @type locks: dict of locking level to list
> +  @param locks: The locks that gnt-debug delay is holding.
> +
> +  @rtype: dict of string to string
> +  @return: The lock name to entity name map.
> +
> +  For a given set of locks, some internal locks (e.g. ALL_SET locks) can
> be
> +  blocked even though they were not listed explicitly. This function has
> to take
> +  care and list all locks that can be blocked by the locks given as
> parameters.
> +
> +  """
> +  lock_map = {}
> +
> +  if locking.LEVEL_NODE in locks:
> +    node_locks = locks[locking.LEVEL_NODE]
> +    if node_locks == locking.ALL_SET:
> +      # Empty list retrieves all info
> +      name_uuid_map = _GetNodeUUIDMap([])
> +    else:
> +      name_uuid_map = _GetNodeUUIDMap(node_locks)
> +
> +    for name in name_uuid_map:
> +      lock_map["node/%s" % name_uuid_map[name]] = name
> +
> +    # If ALL_SET was requested explicitly, or there is at least one lock
> +    # Note that locking.ALL_SET is None and hence the strange form of the
> if
> +    if node_locks == locking.ALL_SET or node_locks:
> +      lock_map["node/[lockset]"] = "joint node lock"
> +
> +  #TODO add other lock types here when support for these is added
> +  return lock_map
> +
> +
> +def _GetBlockingLocks():
> +  """ Finds out which locks are blocking jobs by invoking "gnt-debug
> locks".
> +
> +  @rtype: list of string
> +  @return: The names of the locks currently blocking any job.
> +
> +  """
> +  # Due to mysterious issues when a SSH multiplexer is being used by two
> +  # threads, we turn it off, and block most of the logging to improve the
> +  # visibility of the other thread's output
> +  locks_output = _GetOutputFromMaster("gnt-debug locks",
> use_multiplexer=False,
> +                                      log_cmd=False)
> +
> +  # The first non-empty line is the header, which we do not need
> +  lock_lines = locks_output.splitlines()[1:]
> +
> +  blocking_locks = []
> +  for lock_line in lock_lines:
> +    components = lock_line.split()
> +    if len(components) != 4:
> +      raise qa_error.Error("Error while parsing gnt-debug locks output, "
> +                           "line at fault is: %s" % lock_line)
> +
> +    lock_name, _, _, pending_jobs = components
> +
> +    if pending_jobs != '-':
> +      blocking_locks.append(lock_name)
> +
> +  return blocking_locks
> +
> +
>  # TODO: Can this be done as a decorator? Implement as needed.
>  def RunWithLocks(fn, locks, timeout, *args, **kwargs):
>    """ Runs the given function, acquiring a set of locks beforehand.
> @@ -123,10 +207,11 @@ def RunWithLocks(fn, locks, timeout, *args,
> **kwargs):
>    test, to try and see if the function can run in parallel with other
>    operations.
>
> -  The current version simply creates the locks, which expire after a given
> -  timeout, and attempts to invoke the provided function.
> -
> -  This will probably block the QA, and future versions will address this.
> +  Locks are acquired by invoking a gnt-debug delay operation which can be
> +  interrupted as needed. The QA test is then run in a separate thread,
> with the
> +  current thread observing jobs waiting for locks. When a job is spotted
> waiting
> +  for a lock held by the started delay operation, this is noted, and the
> delay
> +  is interrupted, allowing the QA test to continue.
>
>    A default timeout is not provided by design - the test creator must
> make a
>    good conservative estimate.
> @@ -139,11 +224,44 @@ def RunWithLocks(fn, locks, timeout, *args,
> **kwargs):
>    # The watcher may interfere by issuing its own jobs - therefore pause it
>    AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
>
> -  termination_socket = _StartDelayFunction(locks, timeout)
> +  # Find out the lock names prior to starting the delay function
> +  lock_name_map = _FindLockNames(locks)
>
> -  fn(*args, **kwargs)
> +  termination_socket = _StartDelayFunction(locks, timeout)
>
> -  _TerminateDelayFunction(termination_socket)
> +  qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
> +  qa_thread.start()
> +
> +  blocking_owned_locks = []
> +  test_blocked = False
> +
> +  try:
> +    while qa_thread.isAlive():
> +      blocking_locks = _GetBlockingLocks()
> +      blocking_owned_locks = \
> +        set(blocking_locks).intersection(set(lock_name_map))
> +
> +      if blocking_owned_locks:
> +        test_blocked = True
> +        _TerminateDelayFunction(termination_socket)
> +        break
> +
> +      # The sleeping time has been set arbitrarily
> +      time.sleep(5)
> +  except:
> +    # If anything goes wrong here, we should be responsible and terminate
> the
> +    # delay job
> +    _TerminateDelayFunction(termination_socket)
> +    raise
> +
> +  qa_thread.join()
> +
> +  if test_blocked:
> +    blocking_lock_names = map(lock_name_map.get, blocking_owned_locks)
> +    raise qa_error.Error("QA test succeded, but was blocked by the locks:
> %s" %
> +                         ", ".join(blocking_lock_names))
> +  else:
> +    _TerminateDelayFunction(termination_socket)
>
>    # Revive the watcher
>    AssertCommand(["gnt-cluster", "watcher", "continue"])
> --
> 1.9.1.423.g4596e3a
>
>


-- 
-- 
Helga Velroyen | Software Engineer | [email protected] |

Google Germany GmbH
Dienerstr. 12
80331 München

Registergericht und -nummer: Hamburg, HRB 86891
Sitz der Gesellschaft: Hamburg
Geschäftsführer: Graham Law, Christine Elizabeth Flores

Reply via email to