This is an automated email from the ASF dual-hosted git repository. bcall pushed a commit to branch parallel-autest in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit cefce09e7019f9da2c91bc11d99fde0bae83784e Author: Bryan Call <[email protected]> AuthorDate: Wed Feb 4 10:22:54 2026 -0800 autest: Add parallel test runner wrapper script Add autest-parallel.py which runs autest tests in parallel by spawning multiple autest processes with isolated sandboxes and port ranges. Key changes: - ports.py: Add AUTEST_PORT_OFFSET environment variable support to offset the starting port range for each parallel worker, avoiding port conflicts - autest-parallel.py: New script that discovers tests, partitions them across workers, runs them in parallel, and aggregates results Usage: ./autest-parallel.py -j 8 --ats-bin /opt/ats/bin --sandbox /tmp/sb Note: The built-in autest -j flag does not work with ATS tests (causes "No Test run defined" failures), hence the need for this wrapper. Tests with hardcoded ports (select_ports=False) cannot safely run in parallel and may still fail. --- tests/autest-parallel.py | 474 ++++++++++++++++++++++++++++++++++ tests/gold_tests/autest-site/ports.py | 14 +- 2 files changed, 487 insertions(+), 1 deletion(-) diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py new file mode 100755 index 0000000000..dcecfc0846 --- /dev/null +++ b/tests/autest-parallel.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 +''' +Parallel autest runner for Apache Traffic Server. + +This script runs autest tests in parallel by spawning multiple autest processes, +each with a different port offset to avoid port conflicts. + +Usage: + ./autest-parallel.py -j 4 --sandbox /tmp/autest-parallel + ./autest-parallel.py -j 8 --filter "cache-*" --sandbox /tmp/sb + ./autest-parallel.py --list # Just list tests without running +''' +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import fnmatch +import os +import re +import subprocess +import sys +import time +from concurrent.futures import ProcessPoolExecutor, as_completed +from dataclasses import dataclass, field +from pathlib import Path +from typing import List, Optional + + +@dataclass +class TestResult: + """Result from running a single autest process.""" + worker_id: int + tests: List[str] + passed: int = 0 + failed: int = 0 + skipped: int = 0 + warnings: int = 0 + exceptions: int = 0 + unknown: int = 0 + duration: float = 0.0 + failed_tests: List[str] = field(default_factory=list) + output: str = "" + return_code: int = 0 + + +def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = None) -> List[str]: + """ + Discover all .test.py files in the test directory. + + Args: + test_dir: Path to gold_tests directory + filter_patterns: Optional list of glob patterns to filter tests + + Returns: + List of test names (without .test.py extension) + """ + tests = [] + for test_file in test_dir.rglob("*.test.py"): + # Extract test name (filename without .test.py) + test_name = test_file.stem.replace('.test', '') + + # Apply filters if provided + if filter_patterns: + if any(fnmatch.fnmatch(test_name, pattern) for pattern in filter_patterns): + tests.append(test_name) + else: + tests.append(test_name) + + return sorted(tests) + + +def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]: + """ + Partition tests into roughly equal groups for parallel execution. + + Args: + tests: List of test names + num_jobs: Number of parallel workers + + Returns: + List of test lists, one per worker + """ + if num_jobs <= 0: + num_jobs = 1 + + partitions = [[] for _ in range(min(num_jobs, len(tests)))] + for i, test in enumerate(tests): + partitions[i % len(partitions)].append(test) + + return [p for p in partitions if p] # Remove empty partitions + + +def strip_ansi(text: str) -> str: + """Remove ANSI escape codes from text.""" + ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') + return ansi_escape.sub('', text) + + +def parse_autest_output(output: str) -> dict: + """ + Parse autest output to extract pass/fail counts. + + Args: + output: Raw autest output string + + Returns: + Dictionary with counts for passed, failed, skipped, etc. + """ + result = { + 'passed': 0, + 'failed': 0, + 'skipped': 0, + 'warnings': 0, + 'exceptions': 0, + 'unknown': 0, + 'failed_tests': [] + } + + # Strip ANSI codes for easier parsing + clean_output = strip_ansi(output) + + # Parse the summary section + # Format: " Passed: 2" or " Failed: 0" + for line in clean_output.split('\n'): + line = line.strip() + if 'Passed:' in line: + try: + result['passed'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Failed:' in line and 'test' not in line.lower(): + try: + result['failed'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Skipped:' in line: + try: + result['skipped'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Warning:' in line: + try: + result['warnings'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Exception:' in line: + try: + result['exceptions'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + elif 'Unknown:' in line: + try: + result['unknown'] = int(line.split(':')[-1].strip()) + except ValueError: + pass + + # Extract failed test names + # Look for lines like "Test: test_name: Failed" + failed_pattern = re.compile(r'Test:\s+(\S+):\s+Failed', re.IGNORECASE) + for match in failed_pattern.finditer(clean_output): + result['failed_tests'].append(match.group(1)) + + return result + + +def run_worker( + worker_id: int, + tests: List[str], + script_dir: Path, + sandbox_base: Path, + ats_bin: str, + extra_args: List[str], + port_offset_step: int = 1000, + verbose: bool = False +) -> TestResult: + """ + Run autest on a subset of tests with isolated sandbox and port range. + + Args: + worker_id: Worker identifier (0, 1, 2, ...) + tests: List of test names to run + script_dir: Directory containing autest.sh + sandbox_base: Base sandbox directory + ats_bin: Path to ATS bin directory + extra_args: Additional arguments to pass to autest + port_offset_step: Port offset between workers + verbose: Whether to print verbose output + + Returns: + TestResult with pass/fail counts + """ + start_time = time.time() + result = TestResult(worker_id=worker_id, tests=tests) + + # Create worker-specific sandbox + sandbox = sandbox_base / f"worker-{worker_id}" + sandbox.mkdir(parents=True, exist_ok=True) + + # Calculate port offset for this worker + port_offset = worker_id * port_offset_step + + # Build autest command + # Use 'uv run autest' directly for better compatibility + cmd = [ + 'uv', 'run', 'autest', 'run', + '--directory', 'gold_tests', + '--ats-bin', ats_bin, + '--sandbox', str(sandbox), + ] + + # Add test filters + cmd.append('--filters') + cmd.extend(tests) + + # Add any extra arguments + cmd.extend(extra_args) + + # Set up environment with port offset + # We set this as an actual OS environment variable so ports.py can read it + env = os.environ.copy() + env['AUTEST_PORT_OFFSET'] = str(port_offset) + + if verbose: + print(f"[Worker {worker_id}] Running {len(tests)} tests with port offset {port_offset}") + print(f"[Worker {worker_id}] Command: {' '.join(cmd)}") + + try: + proc = subprocess.run( + cmd, + cwd=script_dir, + capture_output=True, + text=True, + env=env, + timeout=3600 # 1 hour timeout per worker + ) + result.output = proc.stdout + proc.stderr + result.return_code = proc.returncode + + # Parse results + parsed = parse_autest_output(result.output) + result.passed = parsed['passed'] + result.failed = parsed['failed'] + result.skipped = parsed['skipped'] + result.warnings = parsed['warnings'] + result.exceptions = parsed['exceptions'] + result.unknown = parsed['unknown'] + result.failed_tests = parsed['failed_tests'] + + except subprocess.TimeoutExpired: + result.output = "TIMEOUT: Worker exceeded 1 hour timeout" + result.return_code = -1 + result.failed = len(tests) + except Exception as e: + result.output = f"ERROR: {str(e)}" + result.return_code = -1 + result.failed = len(tests) + + result.duration = time.time() - start_time + return result + + +def print_summary(results: List[TestResult], total_duration: float): + """Print aggregated results from all workers.""" + total_passed = sum(r.passed for r in results) + total_failed = sum(r.failed for r in results) + total_skipped = sum(r.skipped for r in results) + total_warnings = sum(r.warnings for r in results) + total_exceptions = sum(r.exceptions for r in results) + total_unknown = sum(r.unknown for r in results) + total_tests = total_passed + total_failed + total_skipped + total_warnings + total_exceptions + total_unknown + + all_failed_tests = [] + for r in results: + all_failed_tests.extend(r.failed_tests) + + print("\n" + "=" * 70) + print("PARALLEL AUTEST SUMMARY") + print("=" * 70) + print(f"Workers: {len(results)}") + print(f"Total tests: {total_tests}") + print(f"Duration: {total_duration:.1f}s") + print("-" * 70) + print(f" Passed: {total_passed}") + print(f" Failed: {total_failed}") + print(f" Skipped: {total_skipped}") + print(f" Warnings: {total_warnings}") + print(f" Exceptions: {total_exceptions}") + print(f" Unknown: {total_unknown}") + + if all_failed_tests: + print("-" * 70) + print("FAILED TESTS:") + for test in sorted(all_failed_tests): + print(f" - {test}") + + print("=" * 70) + + # Per-worker summary + print("\nPer-worker breakdown:") + for r in results: + status = "OK" if r.failed == 0 and r.exceptions == 0 else "FAIL" + print(f" Worker {r.worker_id}: {r.passed} passed, {r.failed} failed, " + f"{r.skipped} skipped ({r.duration:.1f}s) [{status}]") + + +def main(): + parser = argparse.ArgumentParser( + description='Run autest tests in parallel', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=''' +Examples: + # Run all tests with 4 parallel workers + %(prog)s -j 4 --ats-bin /opt/ats/bin --sandbox /tmp/autest + + # Run specific tests + %(prog)s -j 2 --filter "cache-*" --filter "tls-*" --ats-bin /opt/ats/bin --sandbox /tmp/autest + + # List tests without running + %(prog)s --list --ats-bin /opt/ats/bin +''' + ) + + parser.add_argument( + '-j', '--jobs', + type=int, + default=os.cpu_count() or 4, + help='Number of parallel workers (default: CPU count)' + ) + parser.add_argument( + '--ats-bin', + required=True, + help='Path to ATS bin directory' + ) + parser.add_argument( + '--sandbox', + default='/tmp/autest-parallel', + help='Base sandbox directory (default: /tmp/autest-parallel)' + ) + parser.add_argument( + '-f', '--filter', + action='append', + dest='filters', + help='Filter tests by glob pattern (can be specified multiple times)' + ) + parser.add_argument( + '--list', + action='store_true', + help='List tests without running' + ) + parser.add_argument( + '--port-offset-step', + type=int, + default=1000, + help='Port offset between workers (default: 1000)' + ) + parser.add_argument( + '-v', '--verbose', + action='store_true', + help='Verbose output' + ) + parser.add_argument( + '--test-dir', + default='gold_tests', + help='Test directory relative to script location (default: gold_tests)' + ) + parser.add_argument( + 'extra_args', + nargs='*', + help='Additional arguments to pass to autest' + ) + + args = parser.parse_args() + + # Determine paths + script_dir = Path(__file__).parent.resolve() + test_dir = script_dir / args.test_dir + + if not test_dir.exists(): + print(f"Error: Test directory not found: {test_dir}", file=sys.stderr) + sys.exit(1) + + # Discover tests + tests = discover_tests(test_dir, args.filters) + + if not tests: + print("No tests found matching the specified filters.", file=sys.stderr) + sys.exit(1) + + print(f"Found {len(tests)} tests") + + if args.list: + print("\nTests:") + for test in tests: + print(f" {test}") + sys.exit(0) + + # Partition tests + num_jobs = min(args.jobs, len(tests)) + partitions = partition_tests(tests, num_jobs) + + print(f"Running with {len(partitions)} parallel workers") + print(f"Port offset step: {args.port_offset_step}") + print(f"Sandbox: {args.sandbox}") + + # Create sandbox base directory + sandbox_base = Path(args.sandbox) + sandbox_base.mkdir(parents=True, exist_ok=True) + + # Run workers in parallel + start_time = time.time() + results: List[TestResult] = [] + + with ProcessPoolExecutor(max_workers=len(partitions)) as executor: + futures = {} + for worker_id, worker_tests in enumerate(partitions): + future = executor.submit( + run_worker, + worker_id=worker_id, + tests=worker_tests, + script_dir=script_dir, + sandbox_base=sandbox_base, + ats_bin=args.ats_bin, + extra_args=args.extra_args or [], + port_offset_step=args.port_offset_step, + verbose=args.verbose + ) + futures[future] = worker_id + + # Collect results as they complete + for future in as_completed(futures): + worker_id = futures[future] + try: + result = future.result() + results.append(result) + status = "PASS" if result.failed == 0 else "FAIL" + print(f"[Worker {worker_id}] Completed: {result.passed} passed, " + f"{result.failed} failed ({result.duration:.1f}s) [{status}]") + except Exception as e: + print(f"[Worker {worker_id}] Error: {e}", file=sys.stderr) + results.append(TestResult( + worker_id=worker_id, + tests=partitions[worker_id], + failed=len(partitions[worker_id]), + output=str(e) + )) + + total_duration = time.time() - start_time + + # Sort results by worker_id for consistent output + results.sort(key=lambda r: r.worker_id) + + # Print summary + print_summary(results, total_duration) + + # Exit with non-zero if any tests failed + total_failed = sum(r.failed + r.exceptions for r in results) + sys.exit(1 if total_failed > 0 else 0) + + +if __name__ == '__main__': + main() diff --git a/tests/gold_tests/autest-site/ports.py b/tests/gold_tests/autest-site/ports.py index bc4de3c1e3..6e402ca0ea 100644 --- a/tests/gold_tests/autest-site/ports.py +++ b/tests/gold_tests/autest-site/ports.py @@ -145,6 +145,11 @@ def _get_listening_ports() -> Set[int]: def _setup_port_queue(amount=1000): """ Build up the set of ports that the OS in theory will not use. + + The AUTEST_PORT_OFFSET environment variable can be used to offset the + starting port range. This is useful when running multiple autest processes + in parallel to avoid port conflicts. Each parallel worker should use a + different offset (e.g., 0, 1000, 2000, etc.). """ global g_ports if g_ports is None: @@ -154,6 +159,12 @@ def _setup_port_queue(amount=1000): # The queue has already been populated. host.WriteDebug('_setup_port_queue', f"Queue was previously populated. Queue size: {g_ports.qsize()}") return + + # Get port offset for parallel execution support + port_offset = int(os.environ.get('AUTEST_PORT_OFFSET', 0)) + if port_offset > 0: + host.WriteVerbose('_setup_port_queue', f"Using port offset: {port_offset}") + try: # Use sysctl to find the range of ports that the OS publishes it uses. # some docker setups don't have sbin setup correctly @@ -177,7 +188,8 @@ def _setup_port_queue(amount=1000): listening_ports = _get_listening_ports() if rmax > amount: # Fill in ports, starting above the upper OS-usable port range. - port = dmax + 1 + # Add port_offset to support parallel test execution. + port = dmax + 1 + port_offset while port < 65536 and g_ports.qsize() < amount: if PortOpen(port, listening_ports=listening_ports): host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}")
