This is an automated email from the ASF dual-hosted git repository. bcall pushed a commit to branch parallel-autest in repository https://gitbox.apache.org/repos/asf/trafficserver.git
commit 419eba279824985ec787fb2dcd740b665c92cf93 Author: Bryan Call <[email protected]> AuthorDate: Sat Feb 7 12:01:27 2026 -0800 autest: Enhance parallel runner with serial test support and LPT scheduling - Add serial_tests.txt for tests that cannot run in parallel (hardcoded ports) - Implement LPT (Longest Processing Time) load balancing using timing data - Add --collect-timings flag to record per-test durations for future runs - Fix port offset for low-range ports in ports.py (was missing offset) - Convert config.test.py and copy_config.test.py to use dynamic ports (removed select_ports=False) - Add run_single_test() for serial execution with timing --- tests/autest-parallel.py | 598 ++++++++++++++++++++++++----- tests/gold_tests/autest-site/ports.py | 3 +- tests/gold_tests/basic/config.test.py | 3 +- tests/gold_tests/basic/copy_config.test.py | 8 +- tests/serial_tests.txt | 14 + 5 files changed, 512 insertions(+), 114 deletions(-) diff --git a/tests/autest-parallel.py b/tests/autest-parallel.py index dcecfc0846..7a9e718c26 100755 --- a/tests/autest-parallel.py +++ b/tests/autest-parallel.py @@ -28,6 +28,7 @@ Usage: import argparse import fnmatch +import json import os import re import subprocess @@ -35,8 +36,16 @@ import sys import time from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass, field +from datetime import datetime from pathlib import Path -from typing import List, Optional +from typing import Dict, List, Optional, Tuple + +# Default timing file location +DEFAULT_TIMING_FILE = Path(__file__).parent / "test-timings.json" +# Default serial tests file location +DEFAULT_SERIAL_TESTS_FILE = Path(__file__).parent / "serial_tests.txt" +# Default estimate for unknown tests (seconds) +DEFAULT_TEST_TIME = 15.0 @dataclass @@ -52,8 +61,10 @@ class TestResult: unknown: int = 0 duration: float = 0.0 failed_tests: List[str] = field(default_factory=list) + test_timings: Dict[str, float] = field(default_factory=dict) output: str = "" return_code: int = 0 + is_serial: bool = False def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = None) -> List[str]: @@ -82,9 +93,65 @@ def discover_tests(test_dir: Path, filter_patterns: Optional[List[str]] = None) return sorted(tests) +def load_serial_tests(serial_file: Path) -> set: + """ + Load list of tests that must run serially from a file. + + The file format is one test name per line, with # for comments. + Test names can be: + - Simple names: test_name (matches any test containing this) + - Full paths: subdir/test_name.test.py + + Returns: + Set of test names that must run serially + """ + serial_tests = set() + if not serial_file.exists(): + return serial_tests + + try: + with open(serial_file) as f: + for line in f: + line = line.strip() + # Skip empty lines and comments + if not line or line.startswith('#'): + continue + # Remove .test.py extension if present + if line.endswith('.test.py'): + line = line[:-8] # Remove .test.py + # Extract just the test name from path + test_name = Path(line).stem.replace('.test', '') + serial_tests.add(test_name) + except IOError: + pass + + return serial_tests + + +def load_timings(timing_file: Path) -> Dict[str, float]: + """Load test timing data from JSON file.""" + if timing_file.exists(): + try: + with open(timing_file) as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + pass + return {} + + +def save_timings(timing_file: Path, timings: Dict[str, float]): + """Save test timing data to JSON file.""" + try: + with open(timing_file, 'w') as f: + json.dump(timings, f, indent=2, sort_keys=True) + except IOError as e: + print(f"Warning: Could not save timings: {e}", file=sys.stderr) + + def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]: """ Partition tests into roughly equal groups for parallel execution. + Simple round-robin partitioning (used when no timing data available). Args: tests: List of test names @@ -103,6 +170,62 @@ def partition_tests(tests: List[str], num_jobs: int) -> List[List[str]]: return [p for p in partitions if p] # Remove empty partitions +def partition_tests_by_time( + tests: List[str], + num_jobs: int, + timings: Dict[str, float] +) -> Tuple[List[List[str]], List[float]]: + """ + Partition tests using LPT (Longest Processing Time first) algorithm. + This balances the load across workers based on expected test duration. + + Args: + tests: List of test names + num_jobs: Number of parallel workers + timings: Dictionary of test name -> expected duration in seconds + + Returns: + Tuple of (partitions, expected_durations) where: + - partitions: List of test lists, one per worker + - expected_durations: Expected total duration for each worker + """ + if num_jobs <= 0: + num_jobs = 1 + + num_workers = min(num_jobs, len(tests)) + + # Get timing for each test, use default for unknown + test_times = [] + unknown_tests = [] + for test in tests: + if test in timings: + test_times.append((test, timings[test])) + else: + unknown_tests.append(test) + + # Sort known tests by time (longest first) for LPT algorithm + test_times.sort(key=lambda x: x[1], reverse=True) + + # Initialize workers + partitions = [[] for _ in range(num_workers)] + worker_loads = [0.0] * num_workers + + # Assign known tests using LPT: assign to worker with least load + for test, duration in test_times: + min_worker = min(range(num_workers), key=lambda w: worker_loads[w]) + partitions[min_worker].append(test) + worker_loads[min_worker] += duration + + # Distribute unknown tests evenly across workers with least load + # Sort unknown tests and distribute them one at a time to balance + for test in unknown_tests: + min_worker = min(range(num_workers), key=lambda w: worker_loads[w]) + partitions[min_worker].append(test) + worker_loads[min_worker] += DEFAULT_TEST_TIME + + return partitions, worker_loads + + def strip_ansi(text: str) -> str: """Remove ANSI escape codes from text.""" ansi_escape = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])') @@ -111,13 +234,13 @@ def strip_ansi(text: str) -> str: def parse_autest_output(output: str) -> dict: """ - Parse autest output to extract pass/fail counts. + Parse autest output to extract pass/fail counts and per-test timings. Args: output: Raw autest output string Returns: - Dictionary with counts for passed, failed, skipped, etc. + Dictionary with counts for passed, failed, skipped, etc. and test_timings """ result = { 'passed': 0, @@ -126,15 +249,49 @@ def parse_autest_output(output: str) -> dict: 'warnings': 0, 'exceptions': 0, 'unknown': 0, - 'failed_tests': [] + 'failed_tests': [], + 'test_timings': {} } # Strip ANSI codes for easier parsing clean_output = strip_ansi(output) + lines = clean_output.split('\n') + + # Track test start times to calculate duration + # Autest output format: + # Running Test: test_name + # ... test output ... + # Test: test_name: Passed/Failed + current_test = None + test_start_line = None + + # First pass: find test results and their line positions + test_results = [] # (line_num, test_name, result) + for i, line in enumerate(lines): + line_stripped = line.strip() + + # Match "Running Test: test_name" or "Test: test_name: Passed/Failed" + running_match = re.match(r'Running Test:\s+(\S+)', line_stripped) + result_match = re.match(r'Test:\s+(\S+):\s+(Passed|Failed|Skipped)', line_stripped, re.IGNORECASE) + + if running_match: + test_results.append((i, running_match.group(1), 'start')) + elif result_match: + test_results.append((i, result_match.group(1), result_match.group(2).lower())) + + # Calculate per-test timing based on line positions + # (rough approximation - actual timing would be better from autest if available) + for i, (line_num, test_name, status) in enumerate(test_results): + if status == 'start': + # Find the corresponding end + for j in range(i + 1, len(test_results)): + end_line, end_name, end_status = test_results[j] + if end_name == test_name and end_status != 'start': + # We don't have actual time, but we'll track it from the worker + break # Parse the summary section - # Format: " Passed: 2" or " Failed: 0" - for line in clean_output.split('\n'): + for line in lines: line = line.strip() if 'Passed:' in line: try: @@ -168,7 +325,6 @@ def parse_autest_output(output: str) -> dict: pass # Extract failed test names - # Look for lines like "Test: test_name: Failed" failed_pattern = re.compile(r'Test:\s+(\S+):\s+Failed', re.IGNORECASE) for match in failed_pattern.finditer(clean_output): result['failed_tests'].append(match.group(1)) @@ -176,6 +332,50 @@ def parse_autest_output(output: str) -> dict: return result +def run_single_test( + test: str, + script_dir: Path, + sandbox: Path, + ats_bin: str, + extra_args: List[str], + env: dict +) -> Tuple[str, float, bool, str]: + """ + Run a single test and return its timing. + + Returns: + Tuple of (test_name, duration, passed, output) + """ + cmd = [ + 'uv', 'run', 'autest', 'run', + '--directory', 'gold_tests', + '--ats-bin', ats_bin, + '--sandbox', str(sandbox / test), + '--filters', test + ] + cmd.extend(extra_args) + + start = time.time() + try: + proc = subprocess.run( + cmd, + cwd=script_dir, + capture_output=True, + text=True, + env=env, + timeout=600 # 10 minute timeout per test + ) + duration = time.time() - start + output = proc.stdout + proc.stderr + parsed = parse_autest_output(output) + passed = parsed['failed'] == 0 and parsed['exceptions'] == 0 + return (test, duration, passed, output) + except subprocess.TimeoutExpired: + return (test, 600.0, False, "TIMEOUT") + except Exception as e: + return (test, time.time() - start, False, str(e)) + + def run_worker( worker_id: int, tests: List[str], @@ -184,7 +384,8 @@ def run_worker( ats_bin: str, extra_args: List[str], port_offset_step: int = 1000, - verbose: bool = False + verbose: bool = False, + collect_timings: bool = False ) -> TestResult: """ Run autest on a subset of tests with isolated sandbox and port range. @@ -198,9 +399,10 @@ def run_worker( extra_args: Additional arguments to pass to autest port_offset_step: Port offset between workers verbose: Whether to print verbose output + collect_timings: If True, run tests one at a time to collect accurate timing Returns: - TestResult with pass/fail counts + TestResult with pass/fail counts and per-test timings """ start_time = time.time() result = TestResult(worker_id=worker_id, tests=tests) @@ -212,67 +414,90 @@ def run_worker( # Calculate port offset for this worker port_offset = worker_id * port_offset_step - # Build autest command - # Use 'uv run autest' directly for better compatibility - cmd = [ - 'uv', 'run', 'autest', 'run', - '--directory', 'gold_tests', - '--ats-bin', ats_bin, - '--sandbox', str(sandbox), - ] - - # Add test filters - cmd.append('--filters') - cmd.extend(tests) - - # Add any extra arguments - cmd.extend(extra_args) - # Set up environment with port offset - # We set this as an actual OS environment variable so ports.py can read it env = os.environ.copy() env['AUTEST_PORT_OFFSET'] = str(port_offset) - if verbose: - print(f"[Worker {worker_id}] Running {len(tests)} tests with port offset {port_offset}") - print(f"[Worker {worker_id}] Command: {' '.join(cmd)}") - - try: - proc = subprocess.run( - cmd, - cwd=script_dir, - capture_output=True, - text=True, - env=env, - timeout=3600 # 1 hour timeout per worker - ) - result.output = proc.stdout + proc.stderr - result.return_code = proc.returncode - - # Parse results - parsed = parse_autest_output(result.output) - result.passed = parsed['passed'] - result.failed = parsed['failed'] - result.skipped = parsed['skipped'] - result.warnings = parsed['warnings'] - result.exceptions = parsed['exceptions'] - result.unknown = parsed['unknown'] - result.failed_tests = parsed['failed_tests'] - - except subprocess.TimeoutExpired: - result.output = "TIMEOUT: Worker exceeded 1 hour timeout" - result.return_code = -1 - result.failed = len(tests) - except Exception as e: - result.output = f"ERROR: {str(e)}" - result.return_code = -1 - result.failed = len(tests) + if collect_timings: + # Run tests one at a time to collect accurate timing + all_output = [] + total_tests = len(tests) + for idx, test in enumerate(tests, 1): + test_name, duration, passed, output = run_single_test( + test, script_dir, sandbox, ats_bin, extra_args, env + ) + result.test_timings[test_name] = duration + all_output.append(output) + + if passed: + result.passed += 1 + else: + result.failed += 1 + result.failed_tests.append(test_name) + + status = "PASS" if passed else "FAIL" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + # Fixed-width format: date time status duration worker progress test_name + print(f"{timestamp} {status:4s} {duration:6.1f}s Worker:{worker_id:2d} {idx:2d}/{total_tests:2d} {test}", flush=True) + + result.output = "\n".join(all_output) + result.return_code = 0 if result.failed == 0 else 1 + else: + # Run all tests in batch (faster but no per-test timing) + cmd = [ + 'uv', 'run', 'autest', 'run', + '--directory', 'gold_tests', + '--ats-bin', ats_bin, + '--sandbox', str(sandbox), + ] + + # Add test filters + cmd.append('--filters') + cmd.extend(tests) + + # Add any extra arguments + cmd.extend(extra_args) + + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"{timestamp} Worker:{worker_id:2d} Starting batch of {len(tests)} tests (port offset {port_offset})", flush=True) + + try: + proc = subprocess.run( + cmd, + cwd=script_dir, + capture_output=True, + text=True, + env=env, + timeout=3600 # 1 hour timeout per worker + ) + result.output = proc.stdout + proc.stderr + result.return_code = proc.returncode + + # Parse results + parsed = parse_autest_output(result.output) + result.passed = parsed['passed'] + result.failed = parsed['failed'] + result.skipped = parsed['skipped'] + result.warnings = parsed['warnings'] + result.exceptions = parsed['exceptions'] + result.unknown = parsed['unknown'] + result.failed_tests = parsed['failed_tests'] + + except subprocess.TimeoutExpired: + result.output = "TIMEOUT: Worker exceeded 1 hour timeout" + result.return_code = -1 + result.failed = len(tests) + except Exception as e: + result.output = f"ERROR: {str(e)}" + result.return_code = -1 + result.failed = len(tests) result.duration = time.time() - start_time return result -def print_summary(results: List[TestResult], total_duration: float): +def print_summary(results: List[TestResult], total_duration: float, + expected_timings: Optional[Dict[str, float]] = None): """Print aggregated results from all workers.""" total_passed = sum(r.passed for r in results) total_failed = sum(r.failed for r in results) @@ -286,6 +511,11 @@ def print_summary(results: List[TestResult], total_duration: float): for r in results: all_failed_tests.extend(r.failed_tests) + # Collect actual timings from results + actual_timings = {} + for r in results: + actual_timings.update(r.test_timings) + print("\n" + "=" * 70) print("PARALLEL AUTEST SUMMARY") print("=" * 70) @@ -306,14 +536,43 @@ def print_summary(results: List[TestResult], total_duration: float): for test in sorted(all_failed_tests): print(f" - {test}") + # Check for timing discrepancies + if expected_timings and actual_timings: + timing_warnings = [] + for test, actual in actual_timings.items(): + if test in expected_timings: + expected = expected_timings[test] + if expected > 0: + ratio = actual / expected + diff = abs(actual - expected) + # Flag if: (>2x ratio AND >10s diff) OR (>30s diff regardless of ratio) + if ((ratio > 2.0 or ratio < 0.5) and diff > 10) or diff > 30: + timing_warnings.append((test, expected, actual, ratio)) + + if timing_warnings: + print("-" * 70) + print("TIMING DISCREPANCIES (expected vs actual):") + for test, expected, actual, ratio in sorted(timing_warnings, key=lambda x: -abs(x[2] - x[1])): + direction = "slower" if actual > expected else "faster" + print(f" {test}: {expected:.1f}s -> {actual:.1f}s ({ratio:.1f}x {direction})") + print("=" * 70) # Per-worker summary print("\nPer-worker breakdown:") for r in results: status = "OK" if r.failed == 0 and r.exceptions == 0 else "FAIL" - print(f" Worker {r.worker_id}: {r.passed} passed, {r.failed} failed, " - f"{r.skipped} skipped ({r.duration:.1f}s) [{status}]") + if r.is_serial: + worker_label = " Serial: " + else: + worker_label = f" Worker:{r.worker_id:2d}" + print(f"{worker_label} {r.passed:3d} passed, {r.failed:3d} failed, " + f"{r.skipped:3d} skipped ({r.duration:6.1f}s) [{status}]") + + # Total summary line + print("-" * 70) + print(f" TOTAL: {total_passed:3d} passed, {total_failed:3d} failed, " + f"{total_skipped:3d} skipped ({total_duration:6.1f}s)") def main(): @@ -330,6 +589,12 @@ Examples: # List tests without running %(prog)s --list --ats-bin /opt/ats/bin + + # Collect timing data (runs tests one at a time for accurate timing) + %(prog)s -j 4 --collect-timings --ats-bin /opt/ats/bin --sandbox /tmp/autest + + # Use saved timing data for load-balanced partitioning + %(prog)s -j 16 --timings-file test-timings.json --ats-bin /opt/ats/bin --sandbox /tmp/autest ''' ) @@ -376,6 +641,33 @@ Examples: default='gold_tests', help='Test directory relative to script location (default: gold_tests)' ) + parser.add_argument( + '--collect-timings', + action='store_true', + help='Run tests one at a time to collect accurate per-test timing data' + ) + parser.add_argument( + '--timings-file', + type=Path, + default=DEFAULT_TIMING_FILE, + help=f'Path to timing data JSON file (default: {DEFAULT_TIMING_FILE})' + ) + parser.add_argument( + '--no-timing', + action='store_true', + help='Disable timing-based load balancing (use round-robin partitioning)' + ) + parser.add_argument( + '--serial-tests-file', + type=Path, + default=DEFAULT_SERIAL_TESTS_FILE, + help=f'Path to file listing tests that must run serially (default: {DEFAULT_SERIAL_TESTS_FILE})' + ) + parser.add_argument( + '--no-serial', + action='store_true', + help='Skip serial tests entirely' + ) parser.add_argument( 'extra_args', nargs='*', @@ -392,28 +684,68 @@ Examples: print(f"Error: Test directory not found: {test_dir}", file=sys.stderr) sys.exit(1) + # Load serial tests list + serial_tests = load_serial_tests(args.serial_tests_file) + if serial_tests: + print(f"Loaded {len(serial_tests)} serial tests from {args.serial_tests_file}") + # Discover tests - tests = discover_tests(test_dir, args.filters) + all_tests = discover_tests(test_dir, args.filters) - if not tests: + if not all_tests: print("No tests found matching the specified filters.", file=sys.stderr) sys.exit(1) - print(f"Found {len(tests)} tests") + # Separate parallel and serial tests + parallel_tests = [t for t in all_tests if t not in serial_tests] + serial_tests_to_run = [t for t in all_tests if t in serial_tests] + + print(f"Found {len(all_tests)} tests ({len(parallel_tests)} parallel, {len(serial_tests_to_run)} serial)") if args.list: - print("\nTests:") - for test in tests: + print("\nParallel tests:") + for test in parallel_tests: print(f" {test}") + if serial_tests_to_run: + print("\nSerial tests (will run after parallel tests):") + for test in serial_tests_to_run: + print(f" {test}") sys.exit(0) - # Partition tests - num_jobs = min(args.jobs, len(tests)) - partitions = partition_tests(tests, num_jobs) - - print(f"Running with {len(partitions)} parallel workers") + # Load existing timing data + timings = {} + if not args.no_timing: + timings = load_timings(args.timings_file) + if timings: + known_tests = sum(1 for t in parallel_tests if t in timings) + print(f"Loaded timing data for {known_tests}/{len(parallel_tests)} parallel tests from {args.timings_file}") + + # Partition parallel tests + num_jobs = min(args.jobs, len(parallel_tests)) if parallel_tests else 0 + + if parallel_tests: + if timings and not args.no_timing: + # Use timing-based load balancing (LPT algorithm) + partitions, expected_loads = partition_tests_by_time(parallel_tests, num_jobs, timings) + print(f"Using timing-based load balancing") + if args.verbose: + for i, load in enumerate(expected_loads): + print(f" Worker {i}: {len(partitions[i])} tests, ~{load:.1f}s expected") + else: + # Fall back to simple round-robin partitioning + partitions = partition_tests(parallel_tests, num_jobs) + print(f"Using round-robin partitioning") + else: + partitions = [] + + if partitions: + print(f"Running with {len(partitions)} parallel workers") print(f"Port offset step: {args.port_offset_step}") print(f"Sandbox: {args.sandbox}") + if args.collect_timings: + print("Collecting per-test timing data (tests run sequentially per worker)") + if serial_tests_to_run and not args.no_serial: + print(f"Serial tests will run after parallel tests complete ({len(serial_tests_to_run)} tests)") # Create sandbox base directory sandbox_base = Path(args.sandbox) @@ -423,47 +755,101 @@ Examples: start_time = time.time() results: List[TestResult] = [] - with ProcessPoolExecutor(max_workers=len(partitions)) as executor: - futures = {} - for worker_id, worker_tests in enumerate(partitions): - future = executor.submit( - run_worker, - worker_id=worker_id, - tests=worker_tests, - script_dir=script_dir, - sandbox_base=sandbox_base, - ats_bin=args.ats_bin, - extra_args=args.extra_args or [], - port_offset_step=args.port_offset_step, - verbose=args.verbose + if partitions: + with ProcessPoolExecutor(max_workers=len(partitions)) as executor: + futures = {} + for worker_id, worker_tests in enumerate(partitions): + future = executor.submit( + run_worker, + worker_id=worker_id, + tests=worker_tests, + script_dir=script_dir, + sandbox_base=sandbox_base, + ats_bin=args.ats_bin, + extra_args=args.extra_args or [], + port_offset_step=args.port_offset_step, + verbose=args.verbose, + collect_timings=args.collect_timings + ) + futures[future] = worker_id + + # Collect results as they complete + for future in as_completed(futures): + worker_id = futures[future] + try: + result = future.result() + results.append(result) + status = "PASS" if result.failed == 0 else "FAIL" + print(f"[Worker {worker_id}] Completed: {result.passed} passed, " + f"{result.failed} failed ({result.duration:.1f}s) [{status}]") + except Exception as e: + print(f"[Worker {worker_id}] Error: {e}", file=sys.stderr) + results.append(TestResult( + worker_id=worker_id, + tests=partitions[worker_id], + failed=len(partitions[worker_id]), + output=str(e) + )) + + # Run serial tests after parallel tests complete + if serial_tests_to_run and not args.no_serial: + print(f"\n{'=' * 70}") + print("RUNNING SERIAL TESTS") + print(f"{'=' * 70}") + serial_start = time.time() + + # Use a special worker ID for serial tests (after parallel workers) + serial_worker_id = len(partitions) if partitions else 0 + + # Set up environment without port offset (serial tests run alone) + env = os.environ.copy() + env['AUTEST_PORT_OFFSET'] = '0' + + serial_result = TestResult(worker_id=serial_worker_id, tests=serial_tests_to_run, is_serial=True) + + for idx, test in enumerate(serial_tests_to_run, 1): + test_name, duration, passed, output = run_single_test( + test, script_dir, sandbox_base / "serial", args.ats_bin, + args.extra_args or [], env ) - futures[future] = worker_id + serial_result.test_timings[test_name] = duration - # Collect results as they complete - for future in as_completed(futures): - worker_id = futures[future] - try: - result = future.result() - results.append(result) - status = "PASS" if result.failed == 0 else "FAIL" - print(f"[Worker {worker_id}] Completed: {result.passed} passed, " - f"{result.failed} failed ({result.duration:.1f}s) [{status}]") - except Exception as e: - print(f"[Worker {worker_id}] Error: {e}", file=sys.stderr) - results.append(TestResult( - worker_id=worker_id, - tests=partitions[worker_id], - failed=len(partitions[worker_id]), - output=str(e) - )) + if passed: + serial_result.passed += 1 + else: + serial_result.failed += 1 + serial_result.failed_tests.append(test_name) + + status = "PASS" if passed else "FAIL" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + print(f"{timestamp} {status:4s} {duration:6.1f}s Serial {idx:2d}/{len(serial_tests_to_run):2d} {test}", flush=True) + + serial_result.duration = time.time() - serial_start + results.append(serial_result) + + print(f"\n[Serial] Completed: {serial_result.passed} passed, " + f"{serial_result.failed} failed ({serial_result.duration:.1f}s)") total_duration = time.time() - start_time # Sort results by worker_id for consistent output results.sort(key=lambda r: r.worker_id) - # Print summary - print_summary(results, total_duration) + # Collect and save timing data if collected + if args.collect_timings: + new_timings = dict(timings) # Start with existing timings + tests_timed = 0 + for r in results: + for test_name, duration in r.test_timings.items(): + new_timings[test_name] = duration + tests_timed += 1 + if tests_timed > 0: + save_timings(args.timings_file, new_timings) + print(f"\nSaved timing data for {tests_timed} tests to {args.timings_file}") + print(f"Total tests in timing database: {len(new_timings)}") + + # Print summary (pass expected timings for discrepancy check) + print_summary(results, total_duration, timings if args.collect_timings else None) # Exit with non-zero if any tests failed total_failed = sum(r.failed + r.exceptions for r in results) diff --git a/tests/gold_tests/autest-site/ports.py b/tests/gold_tests/autest-site/ports.py index 6e402ca0ea..adb28cc3f2 100644 --- a/tests/gold_tests/autest-site/ports.py +++ b/tests/gold_tests/autest-site/ports.py @@ -198,9 +198,10 @@ def _setup_port_queue(amount=1000): g_ports.put(port) port += 1 if rmin > amount and g_ports.qsize() < amount: - port = 2001 # Fill in more ports, starting at 2001, well above well known ports, # and going up until the minimum port range used by the OS. + # Add port_offset to support parallel test execution (same as high range). + port = 2001 + port_offset while port < dmin and g_ports.qsize() < amount: if PortOpen(port, listening_ports=listening_ports): host.WriteDebug('_setup_port_queue', f"Rejecting an already open port: {port}") diff --git a/tests/gold_tests/basic/config.test.py b/tests/gold_tests/basic/config.test.py index 1cb2bf8bc7..c78769465c 100644 --- a/tests/gold_tests/basic/config.test.py +++ b/tests/gold_tests/basic/config.test.py @@ -18,8 +18,7 @@ Test.Summary = "Test start up of Traffic server with configuration modification of starting port" -ts = Test.MakeATSProcess("ts", select_ports=False) -ts.Variables.port = 8090 +ts = Test.MakeATSProcess("ts") ts.Disk.records_config.update({ 'proxy.config.http.server_ports': str(ts.Variables.port) + f" {ts.Variables.uds_path}", }) diff --git a/tests/gold_tests/basic/copy_config.test.py b/tests/gold_tests/basic/copy_config.test.py index ea08732be0..38961f8a85 100644 --- a/tests/gold_tests/basic/copy_config.test.py +++ b/tests/gold_tests/basic/copy_config.test.py @@ -18,16 +18,14 @@ Test.Summary = "Test start up of Traffic server with configuration modification of starting port of different servers at the same time" -# set up some ATS processes -ts1 = Test.MakeATSProcess("ts1", select_ports=False) -ts1.Variables.port = 8090 +# set up some ATS processes with dynamic port selection +ts1 = Test.MakeATSProcess("ts1") ts1.Disk.records_config.update({ 'proxy.config.http.server_ports': str(ts1.Variables.port) + f" {ts1.Variables.uds_path}", }) ts1.Ready = When.PortOpen(ts1.Variables.port) -ts2 = Test.MakeATSProcess("ts2", select_ports=False, enable_uds=False) -ts2.Variables.port = 8091 +ts2 = Test.MakeATSProcess("ts2", enable_uds=False) ts2.Disk.records_config.update({ 'proxy.config.http.server_ports': str(ts2.Variables.port), }) diff --git a/tests/serial_tests.txt b/tests/serial_tests.txt new file mode 100644 index 0000000000..43675a3b9d --- /dev/null +++ b/tests/serial_tests.txt @@ -0,0 +1,14 @@ +# Tests that must run serially (not in parallel) due to hardcoded ports or +# other constraints that prevent parallel execution. +# +# Format: One test file path per line, relative to tests/gold_tests/ +# Lines starting with # are comments and ignored. + +# Tests that intentionally use select_ports=False for specific port testing +tls/tls_sni_with_port.test.py + +# Tests using select_ports=False without dynamic ports +redirect/redirect_to_same_origin_on_cache.test.py + +# Tests with hardcoded ports that are difficult to make dynamic +parent_proxy/parent-retry.test.py
