LGTM, thanks.

On Mon, Feb 24, 2014 at 2:33 PM, Hrvoje Ribicic <[email protected]> wrote:

> This should improve the situation:
>
> diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py
> index 1207e68..fc068e2 100644
> --- a/qa/qa_job_utils.py
> +++ b/qa/qa_job_utils.py
> @@ -125,7 +125,7 @@ def _GetNodeUUIDMap(nodes):
>
>
>  def _FindLockNames(locks):
> -  """ Finds a mapping of internal lock names to the names of entities
> locked.
> +  """ Finds the ids and descriptions of locks that given locks can block.
>
>    @type locks: dict of locking level to list
>    @param locks: The locks that gnt-debug delay is holding.
> @@ -133,6 +133,10 @@ def _FindLockNames(locks):
>    @rtype: dict of string to string
>    @return: The lock name to entity name map.
>
> +  For a given set of locks, some internal locks (e.g. ALL_SET locks) can
> be
> +  blocked even though they were not listed explicitly. This function has
> to take
> +  care and list all locks that can be blocked by the locks given as
> parameters.
> +
>    """
>    lock_map = {}
>
> @@ -147,8 +151,8 @@ def _FindLockNames(locks):
>      for name in name_uuid_map:
>        lock_map["node/%s" % name_uuid_map[name]] = name
>
> -    # With locking.ALL_SET being defined as None, only node_locks does not
> -    # suffice
> +    # If ALL_SET was requested explicitly, or there is at least one lock
> +    # Note that locking.ALL_SET is None and hence the strange form of the
> if
>      if node_locks == locking.ALL_SET or node_locks:
>        lock_map["node/[lockset]"] = "joint node lock"
>
>
>
>
> On Mon, Feb 24, 2014 at 12:46 PM, Petr Pudlák <[email protected]> wrote:
>
>>
>>
>>
>> On Tue, Feb 18, 2014 at 3:39 PM, Hrvoje Ribicic <[email protected]> wrote:
>>
>>> This patch adds threading to the RunWithTests function, allowing one
>>> thread to execute the QA test, and the other to monitor if it is being
>>> blocked by locks set up during the test. If it is, terminate the
>>> blocking job, and let the QA continue, reporting the test failure at
>>> the very end.
>>>
>>> Signed-off-by: Hrvoje Ribicic <[email protected]>
>>> ---
>>>  qa/qa_job_utils.py | 130
>>> +++++++++++++++++++++++++++++++++++++++++++++++++----
>>>  1 file changed, 122 insertions(+), 8 deletions(-)
>>>
>>> diff --git a/qa/qa_job_utils.py b/qa/qa_job_utils.py
>>> index bc4733c..03f8ee1 100644
>>> --- a/qa/qa_job_utils.py
>>> +++ b/qa/qa_job_utils.py
>>> @@ -24,6 +24,8 @@
>>>  """
>>>
>>>  import re
>>> +import threading
>>> +import time
>>>
>>>  from ganeti import constants
>>>  from ganeti import locking
>>> @@ -38,7 +40,7 @@ from qa_utils import AssertCommand, GetCommandOutput,
>>> GetObjectInfo
>>>  AVAILABLE_LOCKS = [locking.LEVEL_NODE, ]
>>>
>>>
>>> -def _GetOutputFromMaster(cmd):
>>> +def _GetOutputFromMaster(cmd, use_multiplexer=True, log_cmd=True):
>>>    """ Gets the output of a command executed on master.
>>>
>>>    """
>>> @@ -51,7 +53,8 @@ def _GetOutputFromMaster(cmd):
>>>    # buildbot
>>>    cmdstr += " 2>&1"
>>>
>>> -  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr)
>>> +  return GetCommandOutput(qa_config.GetMasterNode().primary, cmdstr,
>>> +                          use_multiplexer=use_multiplexer,
>>> log_cmd=log_cmd)
>>>
>>>
>>>  def ExecuteJobProducingCommand(cmd):
>>> @@ -107,6 +110,83 @@ def _TerminateDelayFunction(termination_socket):
>>>    AssertCommand("echo a | socat -u stdin UNIX-CLIENT:%s" %
>>> termination_socket)
>>>
>>>
>>> +def _GetNodeUUIDMap(nodes):
>>> +  """ Given a list of nodes, retrieves a mapping of their names to
>>> UUIDs.
>>> +
>>> +  @type nodes: list of string
>>> +  @param nodes: The nodes to retrieve a map for. If empty, returns
>>> information
>>> +                for all the nodes.
>>> +
>>> +  """
>>> +  cmd = ["gnt-node", "list", "--no-header", "-o", "name,uuid"]
>>> +  cmd.extend(nodes)
>>> +  output = _GetOutputFromMaster(cmd)
>>> +  return dict(map(lambda x: x.split(), output.splitlines()))
>>> +
>>> +
>>>
>>
>> As discussed offline, it'd be good to document this function in more
>> detail so that it's easier to add more locks later. Perhaps there could be
>> a comment section at the start of the module describing what's necessary to
>> add another type of locks.
>>
>>
>>> +def _FindLockNames(locks):
>>> +  """ Finds a mapping of internal lock names to the names of entities
>>> locked.
>>> +
>>> +  @type locks: dict of locking level to list
>>> +  @param locks: The locks that gnt-debug delay is holding.
>>> +
>>> +  @rtype: dict of string to string
>>> +  @return: The lock name to entity name map.
>>> +
>>> +  """
>>> +  lock_map = {}
>>> +
>>> +  if locking.LEVEL_NODE in locks:
>>> +    node_locks = locks[locking.LEVEL_NODE]
>>> +    if node_locks == locking.ALL_SET:
>>> +      # Empty list retrieves all info
>>> +      name_uuid_map = _GetNodeUUIDMap([])
>>> +    else:
>>> +      name_uuid_map = _GetNodeUUIDMap(node_locks)
>>> +
>>> +    for name in name_uuid_map:
>>> +      lock_map["node/%s" % name_uuid_map[name]] = name
>>> +
>>> +    # With locking.ALL_SET being defined as None, only node_locks does
>>> not
>>> +    # suffice
>>> +    if node_locks == locking.ALL_SET or node_locks:
>>> +      lock_map["node/[lockset]"] = "joint node lock"
>>> +
>>> +  #TODO add other lock types here when support for these is added
>>> +  return lock_map
>>> +
>>> +
>>> +def _GetBlockingLocks():
>>> +  """ Finds out which locks are blocking jobs by invoking "gnt-debug
>>> locks".
>>> +
>>> +  @rtype: list of string
>>> +  @return: The names of the locks currently blocking any job.
>>> +
>>> +  """
>>> +  # Due to mysterious issues when a SSH multiplexer is being used by two
>>> +  # threads, we turn it off, and block most of the logging to improve
>>> the
>>> +  # visibility of the other thread's output
>>> +  locks_output = _GetOutputFromMaster("gnt-debug locks",
>>> use_multiplexer=False,
>>> +                                      log_cmd=False)
>>> +
>>> +  # The first non-empty line is the header, which we do not need
>>> +  lock_lines = locks_output.splitlines()[1:]
>>> +
>>> +  blocking_locks = []
>>> +  for lock_line in lock_lines:
>>> +    components = lock_line.split()
>>> +    if len(components) != 4:
>>> +      raise qa_error.Error("Error while parsing gnt-debug locks output,
>>> "
>>> +                           "line at fault is: %s" % lock_line)
>>> +
>>> +    lock_name, _, _, pending_jobs = components
>>> +
>>> +    if pending_jobs != '-':
>>> +      blocking_locks.append(lock_name)
>>> +
>>> +  return blocking_locks
>>> +
>>> +
>>>  # TODO: Can this be done as a decorator? Implement as needed.
>>>  def RunWithLocks(fn, locks, timeout, *args, **kwargs):
>>>    """ Runs the given function, acquiring a set of locks beforehand.
>>> @@ -123,10 +203,11 @@ def RunWithLocks(fn, locks, timeout, *args,
>>> **kwargs):
>>>    test, to try and see if the function can run in parallel with other
>>>    operations.
>>>
>>> -  The current version simply creates the locks, which expire after a
>>> given
>>> -  timeout, and attempts to invoke the provided function.
>>> -
>>> -  This will probably block the QA, and future versions will address
>>> this.
>>> +  Locks are acquired by invoking a gnt-debug delay operation which can
>>> be
>>> +  interrupted as needed. The QA test is then run in a separate thread,
>>> with the
>>> +  current thread observing jobs waiting for locks. When a job is
>>> spotted waiting
>>> +  for a lock held by the started delay operation, this is noted, and
>>> the delay
>>> +  is interrupted, allowing the QA test to continue.
>>>
>>>    A default timeout is not provided by design - the test creator must
>>> make a
>>>    good conservative estimate.
>>> @@ -139,11 +220,44 @@ def RunWithLocks(fn, locks, timeout, *args,
>>> **kwargs):
>>>    # The watcher may interfere by issuing its own jobs - therefore pause
>>> it
>>>    AssertCommand(["gnt-cluster", "watcher", "pause", "12h"])
>>>
>>> +  # Find out the lock names prior to starting the delay function
>>> +  lock_name_map = _FindLockNames(locks)
>>> +
>>>    termination_socket = _StartDelayFunction(locks, timeout)
>>>
>>> -  fn(*args, **kwargs)
>>> +  qa_thread = threading.Thread(target=fn, args=args, kwargs=kwargs)
>>> +  qa_thread.start()
>>>
>>> -  _TerminateDelayFunction(termination_socket)
>>> +  blocking_owned_locks = []
>>> +  test_blocked = False
>>> +
>>> +  try:
>>> +    while qa_thread.isAlive():
>>> +      blocking_locks = _GetBlockingLocks()
>>> +      blocking_owned_locks = \
>>> +        set(blocking_locks).intersection(set(lock_name_map))
>>> +
>>> +      if blocking_owned_locks:
>>> +        test_blocked = True
>>> +        _TerminateDelayFunction(termination_socket)
>>> +        break
>>> +
>>> +      # The sleeping time has been set arbitrarily
>>> +      time.sleep(5)
>>> +  except:
>>> +    # If anything goes wrong here, we should be responsible and
>>> terminate the
>>> +    # delay job
>>> +    _TerminateDelayFunction(termination_socket)
>>> +    raise
>>>
>> +
>>> +  qa_thread.join()
>>> +
>>> +  if test_blocked:
>>> +    blocking_lock_names = map(lock_name_map.get, blocking_owned_locks)
>>> +    raise qa_error.Error("QA test succeded, but was blocked by the
>>> locks: %s" %
>>> +                         ", ".join(blocking_lock_names))
>>> +  else:
>>> +    _TerminateDelayFunction(termination_socket)
>>>
>>>    # Revive the watcher
>>>    AssertCommand(["gnt-cluster", "watcher", "continue"])
>>> --
>>> 1.9.0.rc1.175.g0b1dcb5
>>>
>>>
>> LGTM, thanks.
>>
>
>

Reply via email to