Modified: trunk/Tools/Scripts/webkitpy/layout_tests/controllers/layout_test_runner.py (278453 => 278454)
--- trunk/Tools/Scripts/webkitpy/layout_tests/controllers/layout_test_runner.py 2021-06-04 12:04:59 UTC (rev 278453)
+++ trunk/Tools/Scripts/webkitpy/layout_tests/controllers/layout_test_runner.py 2021-06-04 14:04:35 UTC (rev 278454)
@@ -32,9 +32,11 @@
import time
from webkitcorepy.string_utils import pluralize
+from webkitcorepy import TaskPool
from webkitpy.common import message_pool
from webkitpy.common.iteration_compatibility import iteritems
+from webkitpy.common.interrupt_debugging import log_stack_trace_on_signal
from webkitpy.layout_tests.controllers import single_test_runner
from webkitpy.layout_tests.models.test_run_results import TestRunResults
from webkitpy.layout_tests.models import test_expectations
@@ -47,10 +49,42 @@
TestExpectations = test_expectations.TestExpectations
-# Export this so callers don't need to know about message pools.
-WorkerException = message_pool.WorkerException
+def setup_shard(port=None, results_directory=None, devices=None, retrying=False):
+ if devices and getattr(port, 'DEVICE_MANAGER', None):
+ port.DEVICE_MANAGER.AVAILABLE_DEVICES = devices.get('available_devices', [])
+ port.DEVICE_MANAGER.INITIALIZED_DEVICES = devices.get('initialized_devices', None)
+ if retrying:
+ results_directory = port.host.filesystem.join(results_directory, 'retries')
+ port.host.filesystem.maybe_make_directory(results_directory)
+
+ stack_trace_path = port.host.filesystem.join(results_directory, 'python_stack_trace.txt')
+ log_stack_trace_on_signal('SIGTERM', output_file=stack_trace_path)
+ log_stack_trace_on_signal('SIGINT', output_file=stack_trace_path)
+
+ port.did_spawn_worker(int((TaskPool.Process.name).split('/')[-1]))
+ return Worker.setup(port=port, results_directory=results_directory)
+
+
+def handle_started_test(worker, name):
+ if LayoutTestRunner.instance:
+ LayoutTestRunner.instance.printer.print_started_test(name)
+
+
+def run_shard(shard):
+ return Worker.instance.run_tests(shard)
+
+
+def handle_finished_test(worker, result):
+ if LayoutTestRunner.instance:
+ LayoutTestRunner.instance.update_summary_with_result(result)
+
+
+def teardown_shard():
+ return Worker.teardown()
+
+
class TestRunInterruptedException(Exception):
"""Raised when a test run should be stopped immediately."""
def __init__(self, reason):
@@ -63,10 +97,12 @@
class LayoutTestRunner(object):
+ instance = None
+
def __init__(self, options, port, printer, results_directory, needs_http=False, needs_websockets=False, needs_web_platform_test_server=False):
self._options = options
self._port = port
- self._printer = printer
+ self.printer = printer
self._results_directory = results_directory
self._needs_http = needs_http
self._needs_websockets = needs_websockets
@@ -100,48 +136,63 @@
# FIXME: rename all variables to test_run_results or some such ...
run_results = TestRunResults(self._expectations, len(test_inputs))
self._current_run_results = run_results
- self._printer.num_tests = len(test_inputs)
- self._printer.num_started = 0
+ self.printer.num_tests = len(test_inputs)
+ self.printer.num_started = 0
if not retrying:
- self._printer.print_expected(run_results, self._expectations.model().get_tests_with_result_type)
+ self.printer.print_expected(run_results, self._expectations.model().get_tests_with_result_type)
- self._printer.write_update('Sharding tests ...')
+ self.printer.write_update('Sharding tests ...')
all_shards = self._sharder.shard_tests(test_inputs, int(self._options.child_processes), self._options.fully_parallel)
- self._printer.print_workers_and_shards(num_workers, len(all_shards))
+ num_workers = min(num_workers, len(all_shards))
+ self.printer.print_workers_and_shards(num_workers, len(all_shards))
if self._options.dry_run:
return run_results
- self._printer.write_update('Starting %s ...' % pluralize(num_workers, "worker"))
+ self.printer.write_update('Starting {} ...'.format(pluralize(num_workers, "worker")))
+ devices = None
+ if getattr(self._port, 'DEVICE_MANAGER', None):
+ devices = dict(
+ available_devices=self._port.DEVICE_MANAGER.AVAILABLE_DEVICES,
+ initialized_devices=self._port.DEVICE_MANAGER.INITIALIZED_DEVICES,
+ )
+
try:
- with message_pool.get(self, self._worker_factory, num_workers, self._port.worker_startup_delay_secs(), self._port.host) as pool:
- pool.run(('test_list', shard.name, shard.test_inputs) for shard in all_shards)
+ LayoutTestRunner.instance = self
+ with TaskPool(
+ workers=num_workers,
+ setup=setup_shard, setupkwargs=dict(
+ port=self._port,
+ devices=devices,
+ results_directory=self._results_directory,
+ retrying=self._retrying,
+ ), teardown=teardown_shard,
+ ) as pool:
+ for shard in all_shards:
+ pool.do(
+ run_shard, shard,
+ callback=lambda value: self._annotate_results_with_additional_failures(value),
+ )
+ pool.wait()
+
except TestRunInterruptedException as e:
_log.warning(e.reason)
run_results.interrupted = True
except KeyboardInterrupt:
- self._printer.flush()
- self._printer.writeln('Interrupted, exiting ...')
+ self.printer.flush()
+ self.printer.writeln('Interrupted, exiting ...')
run_results.keyboard_interrupted = True
except Exception as e:
- _log.debug('%s("%s") raised, exiting' % (e.__class__.__name__, str(e)))
+ _log.debug('{}("{}") raised, exiting'.format(e.__class__.__name__, str(e)))
raise
+ finally:
+ LayoutTestRunner.instance = None
return run_results
- def _worker_factory(self, worker_connection):
- results_directory = self._results_directory
- if self._retrying:
- self._filesystem.maybe_make_directory(self._filesystem.join(self._results_directory, 'retries'))
- results_directory = self._filesystem.join(self._results_directory, 'retries')
- return Worker(worker_connection, results_directory, self._options)
-
- def _handle_did_spawn_worker(self, worker_number):
- self._port.did_spawn_worker(worker_number)
-
def _mark_interrupted_tests_as_skipped(self, run_results):
for test_input in self._test_inputs:
if test_input.test_name not in run_results.results_by_name:
@@ -172,7 +223,7 @@
# This differs from ORWT because it does not include WebProcess crashes.
"Exiting early after %d crashes and %d timeouts." % (run_results.unexpected_crashes, run_results.unexpected_timeouts))
- def _update_summary_with_result(self, run_results, result):
+ def update_summary_with_result(self, result):
if result.type == test_expectations.SKIP:
exp_str = got_str = 'SKIP'
expected = True
@@ -182,15 +233,15 @@
exp_str = self._expectations.model().expectations_to_string(expectations)
got_str = self._expectations.model().expectation_to_string(result.type)
- run_results.add(result, expected)
+ self._current_run_results.add(result, expected)
- self._printer.print_finished_test(result, expected, exp_str, got_str)
+ self.printer.print_finished_test(result, expected, exp_str, got_str)
- self._interrupt_if_at_failure_limits(run_results)
+ self._interrupt_if_at_failure_limits(self._current_run_results)
- def _annotate_results_with_additional_failures(self, run_results, results):
+ def _annotate_results_with_additional_failures(self, results):
for new_result in results:
- existing_result = run_results.results_by_name.get(new_result.test_name)
+ existing_result = self._current_run_results.results_by_name.get(new_result.test_name)
# When running a chunk (--run-chunk), results_by_name contains all the tests, but (confusingly) all_tests only contains those in the chunk that was run,
# and we don't want to modify the results of a test that didn't run. existing_result.test_number is only non-None for tests that ran.
if existing_result and existing_result.test_number is not None:
@@ -202,110 +253,102 @@
_log.warning(' %s -> changed by leak detection from a %s (%s) to a %s (%s)' % (new_result.test_name,
TestExpectations.EXPECTATION_DESCRIPTION[existing_result.type], 'expected' if was_expected else 'unexpected',
TestExpectations.EXPECTATION_DESCRIPTION[new_result.type], 'expected' if now_expected else 'unexpected'))
- run_results.change_result_to_failure(existing_result, new_result, was_expected, now_expected)
+ self._current_run_results.change_result_to_failure(existing_result, new_result, was_expected, now_expected)
def start_servers(self):
if self._needs_http and not self._did_start_http_server and not self._port.is_http_server_running():
- self._printer.write_update('Starting HTTP server ...')
+ self.printer.write_update('Starting HTTP server ...')
self._port.start_http_server()
self._did_start_http_server = True
if self._needs_websockets and not self._did_start_websocket_server and not self._port.is_websocket_server_running():
- self._printer.write_update('Starting WebSocket server ...')
+ self.printer.write_update('Starting WebSocket server ...')
self._port.start_websocket_server()
self._did_start_websocket_server = True
if self._needs_web_platform_test_server and not self._did_start_wpt_server and not self._port.is_wpt_server_running():
- self._printer.write_update('Starting Web Platform Test server ...')
+ self.printer.write_update('Starting Web Platform Test server ...')
self._port.start_web_platform_test_server()
self._did_start_wpt_server = True
def stop_servers(self):
if self._did_start_http_server:
- self._printer.write_update('Stopping HTTP server ...')
+ self.printer.write_update('Stopping HTTP server ...')
self._port.stop_http_server()
self._did_start_http_server = False
if self._did_start_websocket_server:
- self._printer.write_update('Stopping WebSocket server ...')
+ self.printer.write_update('Stopping WebSocket server ...')
self._port.stop_websocket_server()
self._did_start_websocket_server = False
if self._did_start_wpt_server:
- self._printer.write_update('Stopping Web Platform Test server ...')
+ self.printer.write_update('Stopping Web Platform Test server ...')
self._port.stop_web_platform_test_server()
self._did_start_wpt_server = False
- def handle(self, name, source, *args):
- method = getattr(self, '_handle_' + name)
- if method:
- return method(source, *args)
- raise AssertionError('unknown message %s received from %s, args=%s' % (name, source, repr(args)))
- def _handle_started_test(self, worker_name, test_input, test_timeout_sec):
- self._printer.print_started_test(test_input.test_name)
+class Worker(object):
+ instance = None
- def _handle_finished_test(self, worker_name, result, log_messages=[]):
- self._update_summary_with_result(self._current_run_results, result)
+ @classmethod
+ def setup(cls, port=None, results_directory=None):
+ cls.instance = cls(port=port, results_directory=results_directory)
- def _handle_finished_test_group(self, worker_name, overlay_results, log_messages=[]):
- self._annotate_results_with_additional_failures(self._current_run_results, overlay_results)
+ @classmethod
+ def teardown(cls):
+ if cls.instance:
+ cls.instance.stop()
+ cls.instance = None
-
-class Worker(object):
- def __init__(self, caller, results_directory, options):
- self._caller = caller
- self._worker_number = caller.worker_number
- self._name = caller.name
+ def __init__(self, port, results_directory):
+ self._port = port
self._results_directory = results_directory
- self._options = options
- # The remaining fields are initialized in start()
- self._host = None
- self._port = None
- self._batch_size = None
- self._batch_count = None
- self._filesystem = None
+ self._num_tests = 0
+ self._batch_count = 0
self._driver = None
- self._num_tests = 0
+ self._batch_size = self._port.get_option('batch_size') or 0
- def __del__(self):
- self.stop()
+ def run_tests(self, shard):
+ for input in shard.test_inputs:
+ if not TaskPool.Process.working:
+ break
+ Worker.instance.run_test(input, shard.name)
- def start(self):
- """This method is called when the object is starting to be used and it is safe
- for the object to create state that does not need to be pickled (usually this means
- it is called in a child process)."""
- self._host = self._caller.host
- self._filesystem = self._host.filesystem
- self._port = self._host.port_factory.get(self._options.platform, self._options)
+ _log.debug('{} finished test group'.format(TaskPool.Process.name))
- self._batch_count = 0
- self._batch_size = self._options.batch_size or 0
+ if self._driver and self._driver.has_crashed():
+ self._kill_driver()
- def handle(self, name, source, test_list_name, test_inputs):
- assert name == 'test_list'
- for test_input in test_inputs:
- self._run_test(test_input, test_list_name)
+ additional_results = []
+ if not self._port.get_option('run_singly'):
+ additional_results = self._do_post_tests_work(self._driver)
+ return additional_results
- self._finished_test_group(test_inputs)
-
- def _run_test(self, test_input, shard_name):
+ def run_test(self, test_input, shard_name):
self._batch_count += 1
stop_when_done = False
- if self._batch_size > 0 and self._batch_count >= self._batch_size:
+ if 0 < self._batch_size <= self._batch_count:
self._batch_count = 0
stop_when_done = True
test_timeout_sec = self._timeout(test_input)
start = time.time()
- self._caller.post('started_test', test_input, test_timeout_sec)
+ TaskPool.Process.queue.send(TaskPool.Task(
+ handle_started_test, None, TaskPool.Process.name,
+ test_input.test_name,
+ ))
+
result = self._run_test_with_or_without_timeout(test_input, test_timeout_sec, stop_when_done)
result.shard_name = shard_name
- result.worker_name = self._name
+ result.worker_name = TaskPool.Process.name
result.total_run_time = time.time() - start
result.test_number = self._num_tests
self._num_tests += 1
- self._caller.post('finished_test', result)
+ TaskPool.Process.queue.send(TaskPool.Task(
+ handle_finished_test, None, TaskPool.Process.name,
+ result,
+ ))
self._clean_up_after_test(test_input, result)
@@ -320,20 +363,8 @@
additional_results.append(test_results.TestResult(test_name, [test_failures.FailureDocumentLeak(doc_list)]))
return additional_results
- def _finished_test_group(self, test_inputs):
- _log.debug("%s finished test group" % self._name)
-
- if self._driver and self._driver.has_crashed():
- self._kill_driver()
-
- additional_results = []
- if not self._options.run_singly:
- additional_results = self._do_post_tests_work(self._driver)
-
- self._caller.post('finished_test_group', additional_results)
-
def stop(self):
- _log.debug("%s cleaning up" % self._name)
+ _log.debug('{} cleaning up'.format(TaskPool.Process.name))
self._kill_driver()
def _timeout(self, test_input):
@@ -345,7 +376,7 @@
# Note that we need to convert the test timeout from a
# string value in milliseconds to a float for Python.
driver_timeout_sec = 3.0 * float(test_input.timeout) / 1000.0
- if not self._options.run_singly:
+ if not self._port.get_option('run_singly'):
return driver_timeout_sec
thread_padding_sec = 1.0
@@ -358,11 +389,11 @@
driver = self._driver
self._driver = None
if driver:
- _log.debug("%s killing driver" % self._name)
+ _log.debug('{} killing driver'.format(TaskPool.Process.name))
driver.stop()
def _run_test_with_or_without_timeout(self, test_input, timeout, stop_when_done):
- if self._options.run_singly:
+ if self._port.get_option('run_singly'):
return self._run_test_in_another_thread(test_input, timeout, stop_when_done)
return self._run_test_in_this_thread(test_input, stop_when_done)
@@ -377,13 +408,13 @@
self._batch_count = 0
# Print the error message(s).
- _log.debug("%s %s failed:" % (self._name, test_name))
+ _log.debug('{} {} failed:'.format(TaskPool.Process.name, test_name))
for f in result.failures:
- _log.debug("%s %s" % (self._name, f.message()))
+ _log.debug('{} {}'.format(TaskPool.Process.name, f.message()))
elif result.type == test_expectations.SKIP:
- _log.debug("%s %s skipped" % (self._name, test_name))
+ _log.debug('{} {} skipped'.format(TaskPool.Process.name, test_name))
else:
- _log.debug("%s %s passed" % (self._name, test_name))
+ _log.debug("{} {} passed".format(TaskPool.Process.name, test_name))
def _run_test_in_another_thread(self, test_input, thread_timeout_sec, stop_when_done):
"""Run a test in a separate thread, enforcing a hard time limit.
@@ -400,7 +431,7 @@
"""
worker = self
- driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
+ driver = self._port.create_driver(int((TaskPool.Process.name).split('/')[-1]), self._port.get_option('no_timeout'))
class SingleTestThread(threading.Thread):
def __init__(self):
@@ -449,12 +480,15 @@
if self._driver and self._driver.has_crashed():
self._kill_driver()
if not self._driver:
- self._driver = self._port.create_driver(self._worker_number, self._options.no_timeout)
+ self._driver = self._port.create_driver(int((TaskPool.Process.name).split('/')[-1]), self._port.get_option('no_timeout'))
return self._run_single_test(self._driver, test_input, stop_when_done)
def _run_single_test(self, driver, test_input, stop_when_done):
- return single_test_runner.run_single_test(self._port, self._options, self._results_directory,
- self._name, driver, test_input, stop_when_done)
+ return single_test_runner.run_single_test(
+ self._port, self._port._options, self._results_directory,
+ TaskPool.Process.name,
+ driver, test_input, stop_when_done,
+ )
class TestShard(object):