By using Multiprocessing.dummy.Pool.apply_async() instead of .imap(), an exception in the thread can be raised stopping the run of the suite.
This series does not remove the ability to continue despite an exception, it instead supplements it. This means that both uses cases continue, and ignoring an exception is still the default. Users wanted to stop on an exception should add --thread-except to their arguments. v2: - Actually run in parallel. v3: - Allow stopping or not Signed-off-by: Dylan Baker <dylanx.c.ba...@intel.com> --- framework/options.py | 4 ++++ framework/profile.py | 44 ++++++++++++++++++++++++++++++++----------- framework/programs/parsers.py | 7 +++++++ framework/programs/run.py | 7 +++++-- 4 files changed, 49 insertions(+), 13 deletions(-) diff --git a/framework/options.py b/framework/options.py index cf49520..43834f9 100644 --- a/framework/options.py +++ b/framework/options.py @@ -179,6 +179,9 @@ class _Options(object): # pylint: disable=too-many-instance-attributes valgrind -- True if valgrind is to be used dmesg -- True if dmesg checking is desired. This forces concurrency off env -- environment variables set for each test before run + sync -- sync after each write operation + thread_except -- if there is an exception in a runner thread let it stop + the runner """ include_filter = _ReListDescriptor('_include_filter', type_=_FilterReList) @@ -193,6 +196,7 @@ class _Options(object): # pylint: disable=too-many-instance-attributes self.valgrind = False self.dmesg = False self.sync = False + self.thread_except = False # env is used to set some base environment variables that are not going # to change across runs, without sending them to os.environ which is diff --git a/framework/profile.py b/framework/profile.py index fc38c56..43418e3 100644 --- a/framework/profile.py +++ b/framework/profile.py @@ -252,22 +252,44 @@ class TestProfile(object): self._pre_run_hook() - chunksize = 1 - self._prepare_test_list() log = LogManager(logger, len(self.test_list)) - def test(pair): + def test(*args): """Function to call test.execute from map""" - name, test = pair + # pool.imap and pool.apply_async provide different arguments + if len(args) == 1: + name, test = args[0] + else: + assert len(args) == 2 + name, test = args + with backend.write_test(name) as w: test.execute(name, log.get(), self.dmesg) w(test.result) def run_threads(pool, testlist): - """ Open a pool, close it, and join it """ - pool.imap(test, testlist, chunksize) - pool.close() + """Run all of the tests, and look for exceptions. + + This creates a list of results, and then checks them. the only + reason to check each result is that if there is an exception then + it will be raised in the main thread rather than in the worker + threads. + + """ + if options.OPTIONS.thread_except: + results = [] + for pair in six.iteritems(testlist): + results.append(pool.apply_async(test, pair)) + + pool.close() + + for r in results: + r.get() + else: + pool.imap_unordered(test, six.iteritems(testlist), 10) + pool.close() + pool.join() # Multiprocessing.dummy is a wrapper around Threading that provides a @@ -278,15 +300,15 @@ class TestProfile(object): multi = multiprocessing.dummy.Pool() if options.OPTIONS.concurrent == "all": - run_threads(multi, six.iteritems(self.test_list)) + run_threads(multi, self.test_list) elif options.OPTIONS.concurrent == "none": - run_threads(single, six.iteritems(self.test_list)) + run_threads(single, self.test_list) else: # Filter and return only thread safe tests to the threaded pool - run_threads(multi, (x for x in six.iteritems(self.test_list) + run_threads(multi, (x for x in self.test_list if x[1].run_concurrent)) # Filter and return the non thread safe tests to the single pool - run_threads(single, (x for x in six.iteritems(self.test_list) + run_threads(single, (x for x in self.test_list if not x[1].run_concurrent)) log.get().summary() diff --git a/framework/programs/parsers.py b/framework/programs/parsers.py index 9e1d1e3..3d3b82d 100644 --- a/framework/programs/parsers.py +++ b/framework/programs/parsers.py @@ -42,6 +42,13 @@ CONFIG.add_argument("-f", "--config", type=argparse.FileType("r"), help="override piglit.conf search path") +THREAD_EXCEPT = argparse.ArgumentParser(add_help=False) +THREAD_EXCEPT.add_argument('--thread-except', + dest='thread_except', + action='store_true', + help='Stop the runner when an exception occurs in a' + ' runner thread.') + def parse_config(input_): """Convenience method for the CONFIG parser. diff --git a/framework/programs/run.py b/framework/programs/run.py index 49f7b11..e0ff3ff 100644 --- a/framework/programs/run.py +++ b/framework/programs/run.py @@ -87,7 +87,8 @@ def _run_parser(input_): unparsed = parsers.parse_config(input_)[1] # Set the parent of the config to add the -f/--config message - parser = argparse.ArgumentParser(parents=[parsers.CONFIG]) + parser = argparse.ArgumentParser(parents=[parsers.CONFIG, + parsers.THREAD_EXCEPT]) parser.add_argument("-n", "--name", metavar="<test name>", default=None, @@ -236,6 +237,7 @@ def run(input_): options.OPTIONS.valgrind = args.valgrind options.OPTIONS.dmesg = args.dmesg options.OPTIONS.sync = args.sync + options.OPTIONS.thread_except = args.thread_except # Set the platform to pass to waffle options.OPTIONS.env['PIGLIT_PLATFORM'] = args.platform @@ -277,7 +279,7 @@ def run(input_): @exceptions.handler def resume(input_): - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser(parents=[parsers.THREAD_EXCEPT]) parser.add_argument("results_path", type=path.realpath, metavar="<Results Path>", @@ -302,6 +304,7 @@ def resume(input_): options.OPTIONS.valgrind = results.options['valgrind'] options.OPTIONS.dmesg = results.options['dmesg'] options.OPTIONS.sync = results.options['sync'] + options.OPTIONS.thread_except = args.thread_except core.get_config(args.config_file) -- 2.7.2 _______________________________________________ Piglit mailing list Piglit@lists.freedesktop.org https://lists.freedesktop.org/mailman/listinfo/piglit