https://github.com/python/cpython/commit/bfc1d2504c183a9464e65c290e48516d176ea41f
commit: bfc1d2504c183a9464e65c290e48516d176ea41f
branch: main
author: sobolevn <[email protected]>
committer: sobolevn <[email protected]>
date: 2024-11-04T13:15:57+03:00
summary:
gh-109413: Add more type hints to `libregrtest` (#126352)
files:
M Lib/test/libregrtest/findtests.py
M Lib/test/libregrtest/main.py
M Lib/test/libregrtest/pgo.py
M Lib/test/libregrtest/refleak.py
M Lib/test/libregrtest/result.py
M Lib/test/libregrtest/results.py
M Lib/test/libregrtest/runtests.py
M Lib/test/libregrtest/setup.py
M Lib/test/libregrtest/tsan.py
M Lib/test/libregrtest/utils.py
M Lib/test/libregrtest/worker.py
M Tools/requirements-dev.txt
diff --git a/Lib/test/libregrtest/findtests.py
b/Lib/test/libregrtest/findtests.py
index 4ac95e23a56b8f..f01c1240774707 100644
--- a/Lib/test/libregrtest/findtests.py
+++ b/Lib/test/libregrtest/findtests.py
@@ -1,6 +1,7 @@
import os
import sys
import unittest
+from collections.abc import Container
from test import support
@@ -34,7 +35,7 @@ def findtestdir(path: StrPath | None = None) -> StrPath:
return path or os.path.dirname(os.path.dirname(__file__)) or os.curdir
-def findtests(*, testdir: StrPath | None = None, exclude=(),
+def findtests(*, testdir: StrPath | None = None, exclude: Container[str] = (),
split_test_dirs: set[TestName] = SPLITTESTDIRS,
base_mod: str = "") -> TestList:
"""Return a list of all applicable test modules."""
@@ -60,8 +61,9 @@ def findtests(*, testdir: StrPath | None = None, exclude=(),
return sorted(tests)
-def split_test_packages(tests, *, testdir: StrPath | None = None, exclude=(),
- split_test_dirs=SPLITTESTDIRS):
+def split_test_packages(tests, *, testdir: StrPath | None = None,
+ exclude: Container[str] = (),
+ split_test_dirs=SPLITTESTDIRS) -> list[TestName]:
testdir = findtestdir(testdir)
splitted = []
for name in tests:
@@ -75,9 +77,9 @@ def split_test_packages(tests, *, testdir: StrPath | None =
None, exclude=(),
return splitted
-def _list_cases(suite):
+def _list_cases(suite: unittest.TestSuite) -> None:
for test in suite:
- if isinstance(test, unittest.loader._FailedTest):
+ if isinstance(test, unittest.loader._FailedTest): # type:
ignore[attr-defined]
continue
if isinstance(test, unittest.TestSuite):
_list_cases(test)
@@ -87,7 +89,7 @@ def _list_cases(suite):
def list_cases(tests: TestTuple, *,
match_tests: TestFilter | None = None,
- test_dir: StrPath | None = None):
+ test_dir: StrPath | None = None) -> None:
support.verbose = False
set_match_tests(match_tests)
diff --git a/Lib/test/libregrtest/main.py b/Lib/test/libregrtest/main.py
index 11ebb09fbb4745..133eba8ffe8e69 100644
--- a/Lib/test/libregrtest/main.py
+++ b/Lib/test/libregrtest/main.py
@@ -6,6 +6,7 @@
import sysconfig
import time
import trace
+from typing import NoReturn
from test.support import os_helper, MS_WINDOWS, flush_std_streams
@@ -154,7 +155,7 @@ def __init__(self, ns: Namespace, _add_python_opts: bool =
False):
self.next_single_test: TestName | None = None
self.next_single_filename: StrPath | None = None
- def log(self, line=''):
+ def log(self, line: str = '') -> None:
self.logger.log(line)
def find_tests(self, tests: TestList | None = None) -> tuple[TestTuple,
TestList | None]:
@@ -230,11 +231,11 @@ def find_tests(self, tests: TestList | None = None) ->
tuple[TestTuple, TestList
return (tuple(selected), tests)
@staticmethod
- def list_tests(tests: TestTuple):
+ def list_tests(tests: TestTuple) -> None:
for name in tests:
print(name)
- def _rerun_failed_tests(self, runtests: RunTests):
+ def _rerun_failed_tests(self, runtests: RunTests) -> RunTests:
# Configure the runner to re-run tests
if self.num_workers == 0 and not self.single_process:
# Always run tests in fresh processes to have more deterministic
@@ -266,7 +267,7 @@ def _rerun_failed_tests(self, runtests: RunTests):
self.run_tests_sequentially(runtests)
return runtests
- def rerun_failed_tests(self, runtests: RunTests):
+ def rerun_failed_tests(self, runtests: RunTests) -> None:
if self.python_cmd:
# Temp patch for https://github.com/python/cpython/issues/94052
self.log(
@@ -335,7 +336,7 @@ def run_bisect(self, runtests: RunTests) -> None:
if not self._run_bisect(runtests, name, progress):
return
- def display_result(self, runtests):
+ def display_result(self, runtests: RunTests) -> None:
# If running the test suite for PGO then no one cares about results.
if runtests.pgo:
return
@@ -365,7 +366,7 @@ def run_test(
return result
- def run_tests_sequentially(self, runtests) -> None:
+ def run_tests_sequentially(self, runtests: RunTests) -> None:
if self.coverage:
tracer = trace.Trace(trace=False, count=True)
else:
@@ -422,7 +423,7 @@ def run_tests_sequentially(self, runtests) -> None:
if previous_test:
print(previous_test)
- def get_state(self):
+ def get_state(self) -> str:
state = self.results.get_state(self.fail_env_changed)
if self.first_state:
state = f'{self.first_state} then {state}'
@@ -452,7 +453,7 @@ def finalize_tests(self, coverage: trace.CoverageResults |
None) -> None:
if self.junit_filename:
self.results.write_junit(self.junit_filename)
- def display_summary(self):
+ def display_summary(self) -> None:
duration = time.perf_counter() - self.logger.start_time
filtered = bool(self.match_tests)
@@ -466,7 +467,7 @@ def display_summary(self):
state = self.get_state()
print(f"Result: {state}")
- def create_run_tests(self, tests: TestTuple):
+ def create_run_tests(self, tests: TestTuple) -> RunTests:
return RunTests(
tests,
fail_fast=self.fail_fast,
@@ -674,9 +675,9 @@ def _execute_python(self, cmd, environ):
f"Command: {cmd_text}")
# continue executing main()
- def _add_python_opts(self):
- python_opts = []
- regrtest_opts = []
+ def _add_python_opts(self) -> None:
+ python_opts: list[str] = []
+ regrtest_opts: list[str] = []
environ, keep_environ = self._add_cross_compile_opts(regrtest_opts)
if self.ci_mode:
@@ -709,7 +710,7 @@ def _init(self):
self.tmp_dir = get_temp_dir(self.tmp_dir)
- def main(self, tests: TestList | None = None):
+ def main(self, tests: TestList | None = None) -> NoReturn:
if self.want_add_python_opts:
self._add_python_opts()
@@ -738,7 +739,7 @@ def main(self, tests: TestList | None = None):
sys.exit(exitcode)
-def main(tests=None, _add_python_opts=False, **kwargs):
+def main(tests=None, _add_python_opts=False, **kwargs) -> NoReturn:
"""Run the Python suite."""
ns = _parse_args(sys.argv[1:], **kwargs)
Regrtest(ns, _add_python_opts=_add_python_opts).main(tests=tests)
diff --git a/Lib/test/libregrtest/pgo.py b/Lib/test/libregrtest/pgo.py
index e3a6927be5db1d..f762345c88cde3 100644
--- a/Lib/test/libregrtest/pgo.py
+++ b/Lib/test/libregrtest/pgo.py
@@ -50,7 +50,7 @@
'test_xml_etree_c',
]
-def setup_pgo_tests(cmdline_args, pgo_extended: bool):
+def setup_pgo_tests(cmdline_args, pgo_extended: bool) -> None:
if not cmdline_args and not pgo_extended:
# run default set of tests for PGO training
cmdline_args[:] = PGO_TESTS[:]
diff --git a/Lib/test/libregrtest/refleak.py b/Lib/test/libregrtest/refleak.py
index fa447a4336a399..e783475cc7a36b 100644
--- a/Lib/test/libregrtest/refleak.py
+++ b/Lib/test/libregrtest/refleak.py
@@ -262,7 +262,7 @@ def dash_R_cleanup(fs, ps, pic, zdc, abcs):
sys._clear_internal_caches()
-def warm_caches():
+def warm_caches() -> None:
# char cache
s = bytes(range(256))
for i in range(256):
diff --git a/Lib/test/libregrtest/result.py b/Lib/test/libregrtest/result.py
index 74eae40440435d..7553efe5e8abeb 100644
--- a/Lib/test/libregrtest/result.py
+++ b/Lib/test/libregrtest/result.py
@@ -149,6 +149,7 @@ def __str__(self) -> str:
case State.DID_NOT_RUN:
return f"{self.test_name} ran no tests"
case State.TIMEOUT:
+ assert self.duration is not None, "self.duration is None"
return f"{self.test_name} timed out
({format_duration(self.duration)})"
case _:
raise ValueError("unknown result state: {state!r}")
diff --git a/Lib/test/libregrtest/results.py b/Lib/test/libregrtest/results.py
index 0e28435bc7d629..4f3e84282dc5dc 100644
--- a/Lib/test/libregrtest/results.py
+++ b/Lib/test/libregrtest/results.py
@@ -71,7 +71,7 @@ def get_state(self, fail_env_changed: bool) -> str:
return ', '.join(state)
- def get_exitcode(self, fail_env_changed, fail_rerun):
+ def get_exitcode(self, fail_env_changed: bool, fail_rerun: bool) -> int:
exitcode = 0
if self.bad:
exitcode = EXITCODE_BAD_TEST
@@ -87,7 +87,7 @@ def get_exitcode(self, fail_env_changed, fail_rerun):
exitcode = EXITCODE_BAD_TEST
return exitcode
- def accumulate_result(self, result: TestResult, runtests: RunTests):
+ def accumulate_result(self, result: TestResult, runtests: RunTests) ->
None:
test_name = result.test_name
rerun = runtests.rerun
fail_env_changed = runtests.fail_env_changed
@@ -135,7 +135,7 @@ def get_coverage_results(self) -> trace.CoverageResults:
counts = {loc: 1 for loc in self.covered_lines}
return trace.CoverageResults(counts=counts)
- def need_rerun(self):
+ def need_rerun(self) -> bool:
return bool(self.rerun_results)
def prepare_rerun(self, *, clear: bool = True) -> tuple[TestTuple,
FilterDict]:
@@ -158,7 +158,7 @@ def prepare_rerun(self, *, clear: bool = True) ->
tuple[TestTuple, FilterDict]:
return (tuple(tests), match_tests_dict)
- def add_junit(self, xml_data: list[str]):
+ def add_junit(self, xml_data: list[str]) -> None:
import xml.etree.ElementTree as ET
for e in xml_data:
try:
@@ -167,7 +167,7 @@ def add_junit(self, xml_data: list[str]):
print(xml_data, file=sys.__stderr__)
raise
- def write_junit(self, filename: StrPath):
+ def write_junit(self, filename: StrPath) -> None:
if not self.testsuite_xml:
# Don't create empty XML file
return
@@ -192,7 +192,7 @@ def write_junit(self, filename: StrPath):
for s in ET.tostringlist(root):
f.write(s)
- def display_result(self, tests: TestTuple, quiet: bool, print_slowest:
bool):
+ def display_result(self, tests: TestTuple, quiet: bool, print_slowest:
bool) -> None:
if print_slowest:
self.test_times.sort(reverse=True)
print()
@@ -234,7 +234,7 @@ def display_result(self, tests: TestTuple, quiet: bool,
print_slowest: bool):
print()
print("Test suite interrupted by signal SIGINT.")
- def display_summary(self, first_runtests: RunTests, filtered: bool):
+ def display_summary(self, first_runtests: RunTests, filtered: bool) ->
None:
# Total tests
stats = self.stats
text = f'run={stats.tests_run:,}'
diff --git a/Lib/test/libregrtest/runtests.py b/Lib/test/libregrtest/runtests.py
index 8e9779524c56a2..cd1ce8080a04df 100644
--- a/Lib/test/libregrtest/runtests.py
+++ b/Lib/test/libregrtest/runtests.py
@@ -5,12 +5,12 @@
import shlex
import subprocess
import sys
-from typing import Any
+from typing import Any, Iterator
from test import support
from .utils import (
- StrPath, StrJSON, TestTuple, TestFilter, FilterTuple, FilterDict)
+ StrPath, StrJSON, TestTuple, TestName, TestFilter, FilterTuple, FilterDict)
class JsonFileType:
@@ -41,8 +41,8 @@ def configure_subprocess(self, popen_kwargs: dict) -> None:
popen_kwargs['startupinfo'] = startupinfo
@contextlib.contextmanager
- def inherit_subprocess(self):
- if self.file_type == JsonFileType.WINDOWS_HANDLE:
+ def inherit_subprocess(self) -> Iterator[None]:
+ if sys.platform == 'win32' and self.file_type ==
JsonFileType.WINDOWS_HANDLE:
os.set_handle_inheritable(self.file, True)
try:
yield
@@ -106,25 +106,25 @@ def copy(self, **override) -> 'RunTests':
state.update(override)
return RunTests(**state)
- def create_worker_runtests(self, **override):
+ def create_worker_runtests(self, **override) -> WorkerRunTests:
state = dataclasses.asdict(self)
state.update(override)
return WorkerRunTests(**state)
- def get_match_tests(self, test_name) -> FilterTuple | None:
+ def get_match_tests(self, test_name: TestName) -> FilterTuple | None:
if self.match_tests_dict is not None:
return self.match_tests_dict.get(test_name, None)
else:
return None
- def get_jobs(self):
+ def get_jobs(self) -> int | None:
# Number of run_single_test() calls needed to run all tests.
# None means that there is not bound limit (--forever option).
if self.forever:
return None
return len(self.tests)
- def iter_tests(self):
+ def iter_tests(self) -> Iterator[TestName]:
if self.forever:
while True:
yield from self.tests
diff --git a/Lib/test/libregrtest/setup.py b/Lib/test/libregrtest/setup.py
index 9e9741493e9a5b..ba57f06b4841d4 100644
--- a/Lib/test/libregrtest/setup.py
+++ b/Lib/test/libregrtest/setup.py
@@ -25,9 +25,10 @@ def setup_test_dir(testdir: str | None) -> None:
sys.path.insert(0, os.path.abspath(testdir))
-def setup_process():
+def setup_process() -> None:
fix_umask()
+ assert sys.__stderr__ is not None, "sys.__stderr__ is None"
try:
stderr_fd = sys.__stderr__.fileno()
except (ValueError, AttributeError):
@@ -35,7 +36,7 @@ def setup_process():
# and ValueError on a closed stream.
#
# Catch AttributeError for stderr being None.
- stderr_fd = None
+ pass
else:
# Display the Python traceback on fatal errors (e.g. segfault)
faulthandler.enable(all_threads=True, file=stderr_fd)
@@ -68,7 +69,7 @@ def setup_process():
for index, path in enumerate(module.__path__):
module.__path__[index] = os.path.abspath(path)
if getattr(module, '__file__', None):
- module.__file__ = os.path.abspath(module.__file__)
+ module.__file__ = os.path.abspath(module.__file__) # type:
ignore[type-var]
if hasattr(sys, 'addaudithook'):
# Add an auditing hook for all tests to ensure PySys_Audit is tested
@@ -87,7 +88,7 @@ def _test_audit_hook(name, args):
os.environ.setdefault(UNICODE_GUARD_ENV, FS_NONASCII)
-def setup_tests(runtests: RunTests):
+def setup_tests(runtests: RunTests) -> None:
support.verbose = runtests.verbose
support.failfast = runtests.fail_fast
support.PGO = runtests.pgo
diff --git a/Lib/test/libregrtest/tsan.py b/Lib/test/libregrtest/tsan.py
index dd18ae2584f5d8..822ac0f4044d9e 100644
--- a/Lib/test/libregrtest/tsan.py
+++ b/Lib/test/libregrtest/tsan.py
@@ -28,6 +28,6 @@
]
-def setup_tsan_tests(cmdline_args):
+def setup_tsan_tests(cmdline_args) -> None:
if not cmdline_args:
cmdline_args[:] = TSAN_TESTS[:]
diff --git a/Lib/test/libregrtest/utils.py b/Lib/test/libregrtest/utils.py
index d6be4ad049d14a..1ecfc61af9e39d 100644
--- a/Lib/test/libregrtest/utils.py
+++ b/Lib/test/libregrtest/utils.py
@@ -58,7 +58,7 @@
FilterDict = dict[TestName, FilterTuple]
-def format_duration(seconds):
+def format_duration(seconds: float) -> str:
ms = math.ceil(seconds * 1e3)
seconds, ms = divmod(ms, 1000)
minutes, seconds = divmod(seconds, 60)
@@ -92,7 +92,7 @@ def strip_py_suffix(names: list[str] | None) -> None:
names[idx] = basename
-def plural(n, singular, plural=None):
+def plural(n: int, singular: str, plural: str | None = None) -> str:
if n == 1:
return singular
elif plural is not None:
@@ -101,7 +101,7 @@ def plural(n, singular, plural=None):
return singular + 's'
-def count(n, word):
+def count(n: int, word: str) -> str:
if n == 1:
return f"{n} {word}"
else:
@@ -123,14 +123,14 @@ def printlist(x, width=70, indent=4, file=None):
file=file)
-def print_warning(msg):
+def print_warning(msg: str) -> None:
support.print_warning(msg)
-orig_unraisablehook = None
+orig_unraisablehook: Callable[..., None] | None = None
-def regrtest_unraisable_hook(unraisable):
+def regrtest_unraisable_hook(unraisable) -> None:
global orig_unraisablehook
support.environment_altered = True
support.print_warning("Unraisable exception")
@@ -138,22 +138,23 @@ def regrtest_unraisable_hook(unraisable):
try:
support.flush_std_streams()
sys.stderr = support.print_warning.orig_stderr
+ assert orig_unraisablehook is not None, "orig_unraisablehook not set"
orig_unraisablehook(unraisable)
sys.stderr.flush()
finally:
sys.stderr = old_stderr
-def setup_unraisable_hook():
+def setup_unraisable_hook() -> None:
global orig_unraisablehook
orig_unraisablehook = sys.unraisablehook
sys.unraisablehook = regrtest_unraisable_hook
-orig_threading_excepthook = None
+orig_threading_excepthook: Callable[..., None] | None = None
-def regrtest_threading_excepthook(args):
+def regrtest_threading_excepthook(args) -> None:
global orig_threading_excepthook
support.environment_altered = True
support.print_warning(f"Uncaught thread exception:
{args.exc_type.__name__}")
@@ -161,13 +162,14 @@ def regrtest_threading_excepthook(args):
try:
support.flush_std_streams()
sys.stderr = support.print_warning.orig_stderr
+ assert orig_threading_excepthook is not None,
"orig_threading_excepthook not set"
orig_threading_excepthook(args)
sys.stderr.flush()
finally:
sys.stderr = old_stderr
-def setup_threading_excepthook():
+def setup_threading_excepthook() -> None:
global orig_threading_excepthook
import threading
orig_threading_excepthook = threading.excepthook
@@ -476,7 +478,7 @@ def get_temp_dir(tmp_dir: StrPath | None = None) -> StrPath:
return os.path.abspath(tmp_dir)
-def fix_umask():
+def fix_umask() -> None:
if support.is_emscripten:
# Emscripten has default umask 0o777, which breaks some tests.
# see https://github.com/emscripten-core/emscripten/issues/17269
@@ -572,7 +574,8 @@ def abs_module_name(test_name: TestName, test_dir: StrPath
| None) -> TestName:
'setUpModule', 'tearDownModule',
))
-def normalize_test_name(test_full_name, *, is_error=False):
+def normalize_test_name(test_full_name: str, *,
+ is_error: bool = False) -> str | None:
short_name = test_full_name.split(" ")[0]
if is_error and short_name in _TEST_LIFECYCLE_HOOKS:
if test_full_name.startswith(('setUpModule (', 'tearDownModule (')):
@@ -593,7 +596,7 @@ def normalize_test_name(test_full_name, *, is_error=False):
return short_name
-def adjust_rlimit_nofile():
+def adjust_rlimit_nofile() -> None:
"""
On macOS the default fd limit (RLIMIT_NOFILE) is sometimes too low (256)
for our test suite to succeed. Raise it to something more reasonable. 1024
@@ -619,17 +622,17 @@ def adjust_rlimit_nofile():
f"{new_fd_limit}: {err}.")
-def get_host_runner():
+def get_host_runner() -> str:
if (hostrunner := os.environ.get("_PYTHON_HOSTRUNNER")) is None:
hostrunner = sysconfig.get_config_var("HOSTRUNNER")
return hostrunner
-def is_cross_compiled():
+def is_cross_compiled() -> bool:
return ('_PYTHON_HOST_PLATFORM' in os.environ)
-def format_resources(use_resources: Iterable[str]):
+def format_resources(use_resources: Iterable[str]) -> str:
use_resources = set(use_resources)
all_resources = set(ALL_RESOURCES)
@@ -654,7 +657,7 @@ def format_resources(use_resources: Iterable[str]):
def display_header(use_resources: tuple[str, ...],
- python_cmd: tuple[str, ...] | None):
+ python_cmd: tuple[str, ...] | None) -> None:
# Print basic platform information
print("==", platform.python_implementation(), *sys.version.split())
print("==", platform.platform(aliased=True),
@@ -732,7 +735,7 @@ def display_header(use_resources: tuple[str, ...],
print(flush=True)
-def cleanup_temp_dir(tmp_dir: StrPath):
+def cleanup_temp_dir(tmp_dir: StrPath) -> None:
import glob
path = os.path.join(glob.escape(tmp_dir), TMP_PREFIX + '*')
@@ -763,5 +766,5 @@ def _sanitize_xml_replace(regs):
return ''.join(f'\\x{ord(ch):02x}' if ch <= '\xff' else ascii(ch)[1:-1]
for ch in text)
-def sanitize_xml(text):
+def sanitize_xml(text: str) -> str:
return ILLEGAL_XML_CHARS_RE.sub(_sanitize_xml_replace, text)
diff --git a/Lib/test/libregrtest/worker.py b/Lib/test/libregrtest/worker.py
index 86cc30835fdbda..da24760a82c6c6 100644
--- a/Lib/test/libregrtest/worker.py
+++ b/Lib/test/libregrtest/worker.py
@@ -104,7 +104,7 @@ def worker_process(worker_json: StrJSON) -> NoReturn:
sys.exit(0)
-def main():
+def main() -> NoReturn:
if len(sys.argv) != 2:
print("usage: python -m test.libregrtest.worker JSON")
sys.exit(1)
diff --git a/Tools/requirements-dev.txt b/Tools/requirements-dev.txt
index 57f0b982b00f5d..e5badaccadda40 100644
--- a/Tools/requirements-dev.txt
+++ b/Tools/requirements-dev.txt
@@ -1,6 +1,6 @@
# Requirements file for external linters and checks we run on
# Tools/clinic, Tools/cases_generator/, and Tools/peg_generator/ in CI
-mypy==1.12
+mypy==1.13
# needed for peg_generator:
types-psutil==6.0.0.20240901
_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: [email protected]