tfiala updated this revision to Diff 34114.
tfiala added a comment.

This change adds the threading-module-based parallel test runner strategy.  It 
is not selected by default.  It is pool-based.

The threading-pool and multiprocessing-pool strategies are 19.8% slower than 
the newer hand-wrapped ctrl-c-supporting, hand-wrapped multiprocessing pool 
approach.  I'm going to keep the multiprocessing strategy turned on for 
everything except Windows NT.

I may later try to hand-wrap the pooling on the threading side as well, since 
it looks like the slowdown may actually be the Pool provided by 
multiprocessing.pool (which we were always using, so the multiprocessing test 
runner strategy represents a potential 20% win for same-class Linux boxes vs. 
the previous approach currently at top of tree).  These numbers were on a 
24-core Linux box using 24 processes/threads (the defaults for the box).

If I get this working on a Windows box with the current defaults, I'll check it 
in.


http://reviews.llvm.org/D12651

Files:
  test/dosep.py
  test/dotest.py
  test/dotest_args.py

Index: test/dotest_args.py
===================================================================
--- test/dotest_args.py
+++ test/dotest_args.py
@@ -128,11 +128,17 @@
         dest='num_threads',
         help=('The number of threads/processes to use when running tests '
               'separately, defaults to the number of CPU cores available'))
-    parser.add_argument(
+    group.add_argument(
         '--test-subdir',
         action='store',
         help='Specify a test subdirectory to use relative to the test root dir'
     )
+    group.add_argument(
+        '--test-runner-name',
+        action='store',
+        help=('Specify a test runner strategy.  Valid values: multiprocessing,'
+              ' multiprocessing-pool, serial')
+    )
 
     # Remove the reference to our helper function
     del X
Index: test/dotest.py
===================================================================
--- test/dotest.py
+++ test/dotest.py
@@ -249,6 +249,7 @@
 num_threads = None
 output_on_success = False
 no_multiprocess_test_runner = False
+test_runner_name = None
 
 def usage(parser):
     parser.print_help()
@@ -495,6 +496,7 @@
     global num_threads
     global output_on_success
     global no_multiprocess_test_runner
+    global test_runner_name
 
     do_help = False
 
@@ -756,7 +758,8 @@
     if args.inferior:
         is_inferior_test_runner = True
 
-    if args.output_on_success:
+    # Turn on output_on_sucess if either explicitly added or -v specified.
+    if args.output_on_success or args.v:
         output_on_success = True
 
     if args.num_threads:
@@ -765,6 +768,9 @@
     if args.test_subdir:
         multiprocess_test_subdir = args.test_subdir
 
+    if args.test_runner_name:
+        test_runner_name = args.test_runner_name
+
     if args.lldb_platform_name:
         lldb_platform_name = args.lldb_platform_name
     if args.lldb_platform_url:
@@ -1278,7 +1284,8 @@
     # multiprocess test runner here.
     if isMultiprocessTestRunner():
         import dosep
-        dosep.main(output_on_success, num_threads, multiprocess_test_subdir)
+        dosep.main(output_on_success, num_threads, multiprocess_test_subdir,
+                   test_runner_name)
         raise "should never get here"
 
     setupSysPath()
Index: test/dosep.py
===================================================================
--- test/dosep.py
+++ test/dosep.py
@@ -32,14 +32,19 @@
 echo core.%p | sudo tee /proc/sys/kernel/core_pattern
 """
 
+import fnmatch
 import multiprocessing
+import multiprocessing.pool
 import os
-import fnmatch
 import platform
+import Queue
 import re
-import dotest_args
+import signal
 import subprocess
 import sys
+import threading
+
+import dotest_args
 
 from optparse import OptionParser
 
@@ -142,7 +147,7 @@
     return passes, failures, unexpected_successes
 
 
-def call_with_timeout(command, timeout, name):
+def call_with_timeout(command, timeout, name, inferior_pid_events):
     """Run command with a timeout if possible."""
     """-s QUIT will create a coredump if they are enabled on your system"""
     process = None
@@ -161,8 +166,14 @@
                                    stdin=subprocess.PIPE,
                                    stdout=subprocess.PIPE,
                                    stderr=subprocess.PIPE)
+    inferior_pid = process.pid
+    if inferior_pid_events:
+        inferior_pid_events.put_nowait(('created', inferior_pid))
     output = process.communicate()
     exit_status = process.returncode
+    if inferior_pid_events:
+        inferior_pid_events.put_nowait(('destroyed', inferior_pid))
+
     passes, failures, unexpected_successes = parse_test_results(output)
     if exit_status == 0:
         # stdout does not have any useful information from 'dotest.py',
@@ -173,7 +184,7 @@
     return name, exit_status, passes, failures, unexpected_successes
 
 
-def process_dir(root, files, test_root, dotest_argv):
+def process_dir(root, files, test_root, dotest_argv, inferior_pid_events):
     """Examine a directory for tests, and invoke any found within it."""
     results = []
     for name in files:
@@ -187,7 +198,8 @@
         timeout = (os.getenv("LLDB_%s_TIMEOUT" % timeout_name) or
                    getDefaultTimeout(dotest_options.lldb_platform_name))
 
-        results.append(call_with_timeout(command, timeout, name))
+        results.append(call_with_timeout(
+            command, timeout, name, inferior_pid_events))
 
     # result = (name, status, passes, failures, unexpected_successes)
     timed_out = [name for name, status, _, _, _ in results
@@ -208,39 +220,130 @@
 out_q = None
 
 
-def process_dir_worker(arg_tuple):
-    """Worker thread main loop when in multithreaded mode.
+def process_dir_worker_multiprocessing(
+        a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
+        a_dotest_options, job_queue, result_queue, inferior_pid_events):
+    """Worker thread main loop when in multiprocessing mode.
     Takes one directory specification at a time and works on it."""
-    return process_dir(*arg_tuple)
 
+    # Shut off interrupt handling in the child process.
+    signal.signal(signal.SIGINT, signal.SIG_IGN)
 
-def walk_and_invoke(test_directory, test_subdir, dotest_argv, num_threads):
-    """Look for matched files and invoke test driver on each one.
-    In single-threaded mode, each test driver is invoked directly.
-    In multi-threaded mode, submit each test driver to a worker
-    queue, and then wait for all to complete.
+    # Setup the global state for the worker process.
+    setup_global_variables(
+        a_output_lock, a_test_counter, a_total_tests, a_test_name_len,
+        a_dotest_options)
 
-    test_directory - lldb/test/ directory
-    test_subdir - lldb/test/ or a subfolder with the tests we're interested in
-                  running
+    # Keep grabbing entries from the queue until done.
+    while not job_queue.empty():
+        try:
+            job = job_queue.get(block=False)
+            result = process_dir(job[0], job[1], job[2], job[3],
+                                 inferior_pid_events)
+            result_queue.put(result)
+        except Queue.Empty:
+            # Fine, we're done.
+            pass
+
+
+def process_dir_worker_multiprocessing_pool(args):
+    return process_dir(*args)
+
+
+def process_dir_worker_threading_pool(args):
+    return process_dir(*args)
+
+
+def process_dir_mapper_inprocess(args):
+    """Map adapter for running the subprocess-based, non-threaded test runner.
+
+    @param args the process work item tuple
+    @return the test result tuple
     """
+    return process_dir(*args)
 
-    # Collect the test files that we'll run.
-    test_work_items = []
-    for root, dirs, files in os.walk(test_subdir, topdown=False):
-        def is_test(name):
+
+def collect_active_pids_from_pid_events(event_queue):
+    """
+    Returns the set of what should be active inferior pids based on
+    the event stream.
+
+    @param event_queue a multiprocessing.Queue containing events of the
+    form:
+         ('created', pid)
+         ('destroyed', pid)
+
+    @return set of inferior dotest.py pids activated but never completed.
+    """
+    active_pid_set = set()
+    while not event_queue.empty():
+        pid_event = event_queue.get_nowait()
+        if pid_event[0] == 'created':
+            active_pid_set.add(pid_event[1])
+        elif pid_event[0] == 'destroyed':
+            active_pid_set.remove(pid_event[1])
+    return active_pid_set
+
+
+def kill_all_worker_processes(workers, inferior_pid_events):
+    """
+    Kills all specified worker processes and their process tree.
+
+    @param workers a list of multiprocess.Process worker objects.
+    @param inferior_pid_events a multiprocess.Queue that contains
+    all inferior create and destroy events.  Used to construct
+    the list of child pids still outstanding that need to be killed.
+    """
+    for worker in workers:
+        worker.terminate()
+        worker.join()
+
+    # Add all the child test pids created.
+    active_pid_set = collect_active_pids_from_pid_events(
+        inferior_pid_events)
+    for inferior_pid in active_pid_set:
+        print "killing inferior pid {}".format(inferior_pid)
+        os.kill(inferior_pid, signal.SIGKILL)
+
+
+def find_test_files_in_dir_tree(dir_root, found_func):
+    """Calls found_func for all the test files in the given dir hierarchy.
+
+    @param dir_root the path to the directory to start scanning
+    for test files.  All files in this directory and all its children
+    directory trees will be searched.
+
+    @param found_func a callable object that will be passed
+    the parent directory (relative to dir_root) and the list of
+    test files from within that directory.
+    """
+    for root, _, files in os.walk(dir_root, topdown=False):
+        def is_test_filename(test_dir, base_filename):
+            """Returns True if the given filename matches the test name format.
+
+            @param test_dir the directory to check.  Should be absolute or
+            relative to current working directory.
+
+            @param base_filename the base name of the filename to check for a
+            dherence to the python test case filename format.
+
+            @return True if name matches the python test case filename format.
+            """
             # Not interested in symbolically linked files.
-            if os.path.islink(os.path.join(root, name)):
+            if os.path.islink(os.path.join(test_dir, base_filename)):
                 return False
             # Only interested in test files with the "Test*.py" naming pattern.
-            return name.startswith("Test") and name.endswith(".py")
+            return (base_filename.startswith("Test") and
+                    base_filename.endswith(".py"))
 
-        tests = filter(is_test, files)
-        test_work_items.append((root, tests, test_directory, dotest_argv))
+        tests = [filename for filename in files
+                 if is_test_filename(root, filename)]
+        if tests:
+            found_func(root, tests)
 
-    global output_lock, test_counter, total_tests, test_name_len
-    output_lock = multiprocessing.RLock()
-    # item = (root, tests, test_directory, dotest_argv)
+
+def initialize_global_vars_common(num_threads, test_work_items):
+    global total_tests, test_counter, test_name_len
     total_tests = sum([len(item[1]) for item in test_work_items])
     test_counter = multiprocessing.Value('i', 0)
     test_name_len = multiprocessing.Value('i', 0)
@@ -248,19 +351,158 @@
         total_tests, num_threads, (num_threads > 1) * "s")
     update_progress()
 
-    # Run the items, either in a pool (for multicore speedup) or
-    # calling each individually.
-    if num_threads > 1:
-        pool = multiprocessing.Pool(
-            num_threads,
-            initializer=setup_global_variables,
-            initargs=(output_lock, test_counter, total_tests, test_name_len,
-                      dotest_options))
-        test_results = pool.map(process_dir_worker, test_work_items)
-    else:
-        test_results = map(process_dir_worker, test_work_items)
 
-    # result = (timed_out, failed, passed, unexpected_successes, fail_count, pass_count)
+def initialize_global_vars_multiprocessing(num_threads, test_work_items):
+    # Initialize the global state we'll use to communicate with the
+    # rest of the flat module.
+    global output_lock
+    output_lock = multiprocessing.RLock()
+    initialize_global_vars_common(num_threads, test_work_items)
+
+
+def initialize_global_vars_threading(num_threads, test_work_items):
+    # Initialize the global state we'll use to communicate with the
+    # rest of the flat module.
+    global output_lock
+    output_lock = threading.RLock()
+    initialize_global_vars_common(num_threads, test_work_items)
+
+
+def multiprocessing_test_runner(num_threads, test_work_items):
+    """Provides multiprocessing.Pool.map() test runner adapter.
+
+    This concurrent test runner is based on the multiprocessing
+    library, and rolls its own worker pooling strategy so it
+    can handle Ctrl-C properly.
+
+    This test runner is known to have an issue running on
+    Windows platforms.
+
+    @param num_threads the number of worker processes to use.
+
+    @param test_work_items the iterable of test work item tuples
+    to run.
+    """
+
+    # Initialize our global state.
+    initialize_global_vars_multiprocessing(num_threads, test_work_items)
+
+    # Create jobs.
+    job_queue = multiprocessing.Queue(len(test_work_items))
+    for test_work_item in test_work_items:
+        job_queue.put(test_work_item)
+
+    result_queue = multiprocessing.Queue(len(test_work_items))
+
+    # Create queues for started child pids.  Terminating
+    # the multiprocess processes does not terminate the
+    # child processes they spawn.  We can remove this tracking
+    # if/when we move to having the multiprocess process directly
+    # perform the test logic.  The Queue size needs to be able to
+    # hold 2 * (num inferior dotest.py processes started) entries.
+    inferior_pid_events = multiprocessing.Queue(4096)
+
+    # Create workers.  We don't use multiprocessing.Pool due to
+    # challenges with handling ^C keyboard interrupts.
+    workers = []
+    for _ in range(num_threads):
+        worker = multiprocessing.Process(
+            target=process_dir_worker_multiprocessing,
+            args=(output_lock,
+                  test_counter,
+                  total_tests,
+                  test_name_len,
+                  dotest_options,
+                  job_queue,
+                  result_queue,
+                  inferior_pid_events))
+        worker.start()
+        workers.append(worker)
+
+    # Wait for all workers to finish, handling ^C as needed.
+    try:
+        for worker in workers:
+            worker.join()
+    except KeyboardInterrupt:
+        # First try to drain the queue of work and let the
+        # running tests complete.
+        while not job_queue.empty():
+            try:
+                # Just drain it to stop more work from being started.
+                job_queue.get_nowait()
+            except Queue.Empty:
+                pass
+
+        print ('\nFirst KeyboardInterrupt received, stopping '
+               'future work.  Press again to hard-stop existing tests.')
+        try:
+            for worker in workers:
+                worker.join()
+        except KeyboardInterrupt:
+            print ('\nSecond KeyboardInterrupt received, killing '
+                   'all worker process trees.')
+            kill_all_worker_processes(workers, inferior_pid_events)
+
+    test_results = []
+    while not result_queue.empty():
+        test_results.append(result_queue.get(block=False))
+    return test_results
+
+
+def multiprocessing_test_runner_pool(num_threads, test_work_items):
+    # Initialize our global state.
+    initialize_global_vars_multiprocessing(num_threads, test_work_items)
+
+    pool = multiprocessing.Pool(
+        num_threads,
+        initializer=setup_global_variables,
+        initargs=(output_lock, test_counter, total_tests, test_name_len,
+                  dotest_options))
+    return pool.map(process_dir_worker_multiprocessing_pool, test_work_items)
+
+
+def threading_test_runner_pool(num_threads, test_work_items):
+    # Initialize our global state.
+    initialize_global_vars_threading(num_threads, test_work_items)
+
+    pool = multiprocessing.pool.ThreadPool(
+        num_threads
+        # initializer=setup_global_variables,
+        # initargs=(output_lock, test_counter, total_tests, test_name_len,
+        #           dotest_options)
+    )
+    return pool.map(process_dir_worker_threading_pool, test_work_items)
+
+
+def inprocess_exec_test_runner(test_work_items):
+    # Initialize our global state.
+    initialize_global_vars_multiprocessing(1, test_work_items)
+    return map(process_dir_mapper_inprocess, test_work_items)
+
+
+def walk_and_invoke(test_directory, test_subdir, dotest_argv,
+                    test_runner_func):
+    """Look for matched files and invoke test driver on each one.
+    In single-threaded mode, each test driver is invoked directly.
+    In multi-threaded mode, submit each test driver to a worker
+    queue, and then wait for all to complete.
+
+    test_directory - lldb/test/ directory
+    test_subdir - lldb/test/ or a subfolder with the tests we're interested in
+                  running
+    """
+
+    # Collect the test files that we'll run.
+    test_work_items = []
+    find_test_files_in_dir_tree(
+        test_subdir, lambda testdir, test_files: test_work_items.append([
+            test_subdir, test_files, test_directory, dotest_argv, None]))
+
+    # Convert test work items into test results using whatever
+    # was provided as the test run function.
+    test_results = test_runner_func(test_work_items)
+
+    # Summarize the results and return to caller.
     timed_out = sum([result[0] for result in test_results], [])
     passed = sum([result[1] for result in test_results], [])
     failed = sum([result[2] for result in test_results], [])
@@ -268,7 +510,8 @@
     pass_count = sum([result[4] for result in test_results])
     fail_count = sum([result[5] for result in test_results])
 
-    return (timed_out, passed, failed, unexpected_successes, pass_count, fail_count)
+    return (timed_out, passed, failed, unexpected_successes, pass_count,
+            fail_count)
 
 
 def getExpectedTimeouts(platform_name):
@@ -354,7 +597,43 @@
     return result
 
 
-def main(print_details_on_success, num_threads, test_subdir):
+def get_test_runner_strategies(num_threads):
+    """Returns the test runner strategies by name in a dictionary.
+
+    @param num_threads specifies the number of threads/processes
+    that will be used for concurrent test runners.
+
+    @return dictionary with key as test runner strategy name and
+    value set to a callable object that takes the test work item
+    and returns a test result tuple.
+    """
+    return {
+        # multiprocessing supports ctrl-c and does not use
+        # multiprocessing.Pool.
+        "multiprocessing":
+        (lambda work_items: multiprocessing_test_runner(
+            num_threads, work_items)),
+
+        # multiprocessing-pool uses multiprocessing.Pool but
+        # does not support Ctrl-C.
+        "multiprocessing-pool":
+        (lambda work_items: multiprocessing_test_runner_pool(
+            num_threads, work_items)),
+
+        "threading-pool":
+        (lambda work_items: threading_test_runner_pool(
+            num_threads, work_items)),
+
+        # serial uses the subprocess-based, single process
+        # test runner.  This provides process isolation but
+        # no concurrent test running.
+        "serial":
+        inprocess_exec_test_runner
+        }
+
+
+def main(print_details_on_success, num_threads, test_subdir,
+         test_runner_name):
     """Run dotest.py in inferior mode in parallel.
 
     @param print_details_on_success the parsed value of the output-on-success
@@ -368,6 +647,13 @@
     @param test_subdir optionally specifies a subdir to limit testing
     within.  May be None if the entire test tree is to be used.  This subdir
     is assumed to be relative to the lldb/test root of the test hierarchy.
+
+    @param test_runner_name if specified, contains the test runner
+    name which selects the strategy used to run the isolated and
+    optionally concurrent test runner. Specify None to allow the
+    system to choose the most appropriate test runner given desired
+    thread count and OS type.
+
     """
 
     dotest_argv = sys.argv[1:]
@@ -435,8 +721,38 @@
         num_threads = 1
 
     system_info = " ".join(platform.uname())
-    (timed_out, passed, failed, unexpected_successes, pass_count, fail_count) = walk_and_invoke(
-        test_directory, test_subdir, dotest_argv, num_threads)
+
+    # Figure out which testrunner strategy we'll use.
+    runner_strategies_by_name = get_test_runner_strategies(num_threads)
+
+    # If the user didn't specify a test runner strategy, determine
+    # the default now based on number of threads and OS type.
+    if not test_runner_name:
+        if num_threads == 1:
+            # Use the serial runner.
+            test_runner_name = "serial"
+        elif os.name == "nt":
+            # Currently the multiprocessing test runner with ctrl-c
+            # support isn't running correctly on nt.  Use the pool
+            # support without ctrl-c.
+            test_runner_name = "multiprocessing-pool"
+        else:
+            # For everyone else, use the ctrl-c-enabled
+            # multiprocessing support.
+            test_runner_name = "multiprocessing"
+
+    if test_runner_name not in runner_strategies_by_name:
+        raise ("specified testrunner name '{}' unknown. "
+               "Valid choices: {}".format(
+                   test_runner_name,
+                   runner_strategies_by_name.keys()))
+    test_runner_func = runner_strategies_by_name[test_runner_name]
+
+    summary_results = walk_and_invoke(
+        test_directory, test_subdir, dotest_argv, test_runner_func)
+
+    (timed_out, passed, failed, unexpected_successes, pass_count,
+     fail_count) = summary_results
 
     timed_out = set(timed_out)
     num_test_files = len(passed) + len(failed)
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
http://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to