This is an automated email from the ASF dual-hosted git repository. sbp pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit d12363361166200f3b00c64c054edd89e524bccd Author: Sean B. Palmer <[email protected]> AuthorDate: Mon Jan 12 20:04:20 2026 +0000 Split apart some RAT check functions --- atr/tasks/checks/rat.py | 482 ++++++++++++++++++++++-------------------- tests/unit/test_checks_rat.py | 24 +-- 2 files changed, 267 insertions(+), 239 deletions(-) diff --git a/atr/tasks/checks/rat.py b/atr/tasks/checks/rat.py index 88772d8..cf0ba87 100644 --- a/atr/tasks/checks/rat.py +++ b/atr/tasks/checks/rat.py @@ -70,6 +70,10 @@ _STD_EXCLUSIONS_EXTENDED: Final[list[str]] = [ ] +class RatError(RuntimeError): + pass + + async def check(args: checks.FunctionArguments) -> results.Results | None: """Use Apache RAT to check the licenses of the files in the artifact.""" recorder = await args.recorder() @@ -155,7 +159,7 @@ async def _check_core( policy_excludes: list[str], ) -> None: result = await asyncio.to_thread( - _check_core_logic, + _synchronous, artifact_path=str(artifact_abs_path), policy_excludes=policy_excludes, rat_jar_path=args.extra_args.get("rat_jar_path", _CONFIG.APACHE_RAT_JAR_PATH), @@ -166,7 +170,6 @@ async def _check_core( # Record individual file failures before the overall result for file in result.unknown_license_files: await recorder.failure("Unknown license", None, member_rel_path=file.name) - for file in result.unapproved_files: await recorder.failure("Unapproved license", {"license": file.license}, member_rel_path=file.name) @@ -181,148 +184,6 @@ async def _check_core( await recorder.success(result.message, result_data) -def _check_core_logic( # noqa: C901 - artifact_path: str, - policy_excludes: list[str], - rat_jar_path: str = _CONFIG.APACHE_RAT_JAR_PATH, - max_extract_size: int = _CONFIG.MAX_EXTRACT_SIZE, - chunk_size: int = _CONFIG.EXTRACT_CHUNK_SIZE, -) -> checkdata.Rat: - """Verify license headers using Apache RAT.""" - log.info(f"Verifying licenses with Apache RAT for {artifact_path}") - log.info(f"PATH environment variable: {os.environ.get('PATH', 'PATH not found')}") - - java_check = _check_java_installed() - if java_check is not None: - return java_check - - # Verify RAT JAR exists and is accessible - rat_jar_path, jar_error = _check_core_logic_jar_exists(rat_jar_path) - if jar_error: - return jar_error - - try: - # Create a temporary directory for extraction - # TODO: We could extract to somewhere in "state/" instead - with tempfile.TemporaryDirectory(prefix="rat_verify_") as temp_dir: - log.info(f"Created temporary directory: {temp_dir}") - - # Extract the archive to the temporary directory - log.info(f"Extracting {artifact_path} to {temp_dir}") - extracted_size, exclude_file_paths = archives.extract( - artifact_path, - temp_dir, - max_size=max_extract_size, - chunk_size=chunk_size, - track_files={_RAT_EXCLUDES_FILENAME}, - ) - log.info(f"Extracted {extracted_size} bytes") - log.info(f"Found {len(exclude_file_paths)} {_RAT_EXCLUDES_FILENAME} file(s): {exclude_file_paths}") - - # Validate that we found at most one exclusion file - if len(exclude_file_paths) > 1: - log.error(f"Multiple {_RAT_EXCLUDES_FILENAME} files found: {exclude_file_paths}") - return checkdata.Rat( - message=f"Multiple {_RAT_EXCLUDES_FILENAME} files not allowed (found {len(exclude_file_paths)})", - errors=[f"Found {len(exclude_file_paths)} {_RAT_EXCLUDES_FILENAME} files"], - ) - - # Narrow to single path after validation - archive_excludes_path: str | None = exclude_file_paths[0] if exclude_file_paths else None - - # Determine excludes_source and effective excludes file - excludes_source: str - effective_excludes_path: str | None - - if archive_excludes_path is not None: - excludes_source = "archive" - effective_excludes_path = archive_excludes_path - log.info(f"Using archive {_RAT_EXCLUDES_FILENAME}: {archive_excludes_path}") - elif policy_excludes: - excludes_source = "policy" - policy_excludes_file = os.path.join(temp_dir, _POLICY_EXCLUDES_FILENAME) - with open(policy_excludes_file, "w") as f: - f.write("\n".join(policy_excludes)) - effective_excludes_path = os.path.relpath(policy_excludes_file, temp_dir) - log.info(f"Using policy excludes written to: {policy_excludes_file}") - else: - excludes_source = "none" - effective_excludes_path = None - log.info("No excludes: using defaults only") - - # Determine scan root based on archive .rat-excludes location - if archive_excludes_path is not None: - scan_root = os.path.dirname(os.path.join(temp_dir, archive_excludes_path)) - - # Verify that scan_root is inside temp_dir - abs_scan_root = os.path.abspath(scan_root) - abs_temp_dir = os.path.abspath(temp_dir) - scan_root_is_inside = (abs_scan_root == abs_temp_dir) or abs_scan_root.startswith(abs_temp_dir + os.sep) - if not scan_root_is_inside: - log.error(f"Scan root {scan_root} is outside temp_dir {temp_dir}") - return checkdata.Rat( - message="Invalid archive structure: exclusion file path escapes extraction directory", - errors=["Exclusion file path escapes extraction directory"], - excludes_source=excludes_source, - ) - - log.info(f"Using {_RAT_EXCLUDES_FILENAME} directory as scan root: {scan_root}") - - untracked_count = _count_files_outside_directory(temp_dir, scan_root) - if untracked_count > 0: - log.error(f"Found {untracked_count} file(s) outside {_RAT_EXCLUDES_FILENAME} directory") - return checkdata.Rat( - message=f"Files exist outside {_RAT_EXCLUDES_FILENAME} directory ({untracked_count} found)", - errors=[f"{untracked_count} file(s) outside {_RAT_EXCLUDES_FILENAME} directory"], - excludes_source=excludes_source, - ) - else: - scan_root = temp_dir - log.info(f"No archive {_RAT_EXCLUDES_FILENAME} found, using temp_dir as scan root: {scan_root}") - - # Execute RAT and get results or error - # Extended std exclusions apply when there's no archive .rat-excludes - apply_extended_std = excludes_source != "archive" - error_result, xml_output_path = _check_core_logic_execute_rat( - rat_jar_path, scan_root, temp_dir, effective_excludes_path, apply_extended_std, excludes_source - ) - if error_result is not None: - error_result.excludes_source = excludes_source - error_result.extended_std_applied = apply_extended_std - return error_result - - # Parse the XML output - log.info(f"Parsing RAT XML output: {xml_output_path}") - # Make sure xml_output_path is not None before parsing - if xml_output_path is None: - raise ValueError("XML output path is None") - - result = _check_core_logic_parse_output(xml_output_path, scan_root) - log.info(f"Successfully parsed RAT output with {util.plural(result.total_files, 'file')}") - - # The unknown_license_files and unapproved_files contain FileEntry objects - # The path is relative to scan_root, so we prepend the scan_root relative path - scan_root_rel = os.path.relpath(scan_root, temp_dir) - if scan_root_rel != ".": - for file in result.unknown_license_files: - file.name = os.path.join(scan_root_rel, os.path.normpath(file.name)) - for file in result.unapproved_files: - file.name = os.path.join(scan_root_rel, os.path.normpath(file.name)) - - result.excludes_source = excludes_source - result.extended_std_applied = apply_extended_std - return result - - except Exception as e: - import traceback - - log.exception("Error running Apache RAT") - return checkdata.Rat( - message=f"Failed to run Apache RAT: {e!s}", - errors=[str(e), traceback.format_exc()], - ) - - def _check_core_logic_execute_rat( rat_jar_path: str, scan_root: str, @@ -425,53 +286,10 @@ def _check_core_logic_execute_rat( return None, xml_output_path -def _check_core_logic_jar_exists(rat_jar_path: str) -> tuple[str, checkdata.Rat | None]: - """Verify that the Apache RAT JAR file exists and is accessible.""" - # Check that the RAT JAR exists - if not os.path.exists(rat_jar_path): - log.error(f"Apache RAT JAR not found at: {rat_jar_path}") - # Try a few common locations: - # ./rat.jar - # ./state/rat.jar - # ../rat.jar - # ../state/rat.jar - # NOTE: We're also doing something like this in task_verify_rat_license - # Should probably decide one place to do it, and do it well - alternative_paths = [ - os.path.join(os.getcwd(), os.path.basename(rat_jar_path)), - os.path.join(os.getcwd(), "state", os.path.basename(rat_jar_path)), - os.path.join(os.path.dirname(os.getcwd()), os.path.basename(rat_jar_path)), - os.path.join(os.path.dirname(os.getcwd()), "state", os.path.basename(rat_jar_path)), - ] - - for alt_path in alternative_paths: - if os.path.exists(alt_path): - log.info(f"Found alternative RAT JAR at: {alt_path}") - rat_jar_path = alt_path - break - - # Double check whether we found the JAR - if not os.path.exists(rat_jar_path): - log.error("Tried alternative paths but Apache RAT JAR still not found") - log.error(f"Current directory: {os.getcwd()}") - log.error(f"Directory contents: {os.listdir(os.getcwd())}") - if os.path.exists("state"): - log.error(f"State directory contents: {os.listdir('state')}") - - return rat_jar_path, checkdata.Rat( - message=f"Apache RAT JAR not found at: {rat_jar_path}", - errors=[f"Missing JAR: {rat_jar_path}"], - ) - else: - log.info(f"Found Apache RAT JAR at: {rat_jar_path}") - - return rat_jar_path, None - - -def _check_core_logic_parse_output(xml_file: str, base_dir: str) -> checkdata.Rat: +def _synchronous_extract_parse_output(xml_file: str, base_dir: str) -> checkdata.Rat: """Parse the XML output from Apache RAT safely.""" try: - return _check_core_logic_parse_output_core(xml_file, base_dir) + return _synchronous_extract_parse_output_core(xml_file, base_dir) except Exception as e: log.error(f"Error parsing RAT output: {e}") return checkdata.Rat( @@ -480,7 +298,7 @@ def _check_core_logic_parse_output(xml_file: str, base_dir: str) -> checkdata.Ra ) -def _check_core_logic_parse_output_core(xml_file: str, base_dir: str) -> checkdata.Rat: +def _synchronous_extract_parse_output_core(xml_file: str, base_dir: str) -> checkdata.Rat: """Parse the XML output from Apache RAT.""" tree = ElementTree.parse(xml_file) root = tree.getroot() @@ -547,43 +365,6 @@ def _check_core_logic_parse_output_core(xml_file: str, base_dir: str) -> checkda ) -def _check_java_installed() -> checkdata.Rat | None: - # Check that Java is installed - # TODO: Run this only once, when the server starts - try: - java_version = subprocess.check_output( - ["java", *_JAVA_MEMORY_ARGS, "-version"], stderr=subprocess.STDOUT, text=True - ) - log.info(f"Java version: {java_version.splitlines()[0]}") - except (subprocess.SubprocessError, FileNotFoundError) as e: - log.error(f"Java is not properly installed or not in PATH: {e}") - - # Try to get some output even if the command failed - try: - # Use run instead of check_output to avoid exceptions - java_result = subprocess.run( - ["java", *_JAVA_MEMORY_ARGS, "-version"], - stderr=subprocess.STDOUT, - stdout=subprocess.PIPE, - text=True, - check=False, - ) - log.info(f"Java command return code: {java_result.returncode}") - log.info(f"Java command output: {java_result.stdout or java_result.stderr}") - - # Try to find where Java might be located - which_java = subprocess.run(["which", "java"], capture_output=True, text=True, check=False) - which_java_result = which_java.stdout.strip() if (which_java.returncode == 0) else "not found" - log.info(f"Result for which java: {which_java_result}") - except Exception as inner_e: - log.error(f"Additional error while trying to debug java: {inner_e}") - - return checkdata.Rat( - message="Java is not properly installed or not in PATH", - errors=[f"Java error: {e}"], - ) - - def _count_files_outside_directory(temp_dir: str, scan_root: str) -> int: """Count regular files that exist outside the scan_root directory.""" count = 0 @@ -627,3 +408,250 @@ def _summary_message(valid: bool, unapproved_licenses: int, unknown_licenses: in if unknown_licenses > 0: message += f"{util.plural(unknown_licenses, 'file')} with unknown licenses" return message + + +def _synchronous( + artifact_path: str, + policy_excludes: list[str], + rat_jar_path: str = _CONFIG.APACHE_RAT_JAR_PATH, + max_extract_size: int = _CONFIG.MAX_EXTRACT_SIZE, + chunk_size: int = _CONFIG.EXTRACT_CHUNK_SIZE, +) -> checkdata.Rat: + """Verify license headers using Apache RAT.""" + log.info(f"Verifying licenses with Apache RAT for {artifact_path}") + log.info(f"PATH environment variable: {os.environ.get('PATH', 'PATH not found')}") + + java_check = _synchronous_check_java_installed() + if java_check is not None: + return java_check + + # Verify RAT JAR exists and is accessible + rat_jar_path, jar_error = _synchronous_check_jar_exists(rat_jar_path) + if jar_error: + return jar_error + + try: + # Create a temporary directory for extraction + # TODO: We could extract to somewhere in "state/" instead + with tempfile.TemporaryDirectory(prefix="rat_verify_") as temp_dir: + log.info(f"Created temporary directory: {temp_dir}") + return _synchronous_extract( + artifact_path, temp_dir, max_extract_size, chunk_size, policy_excludes, rat_jar_path + ) + except Exception as e: + import traceback + + log.exception("Error running Apache RAT") + return checkdata.Rat( + message=f"Failed to run Apache RAT: {e!s}", + errors=[str(e), traceback.format_exc()], + ) + + +def _synchronous_check_jar_exists(rat_jar_path: str) -> tuple[str, checkdata.Rat | None]: + """Verify that the Apache RAT JAR file exists and is accessible.""" + # Check that the RAT JAR exists + if not os.path.exists(rat_jar_path): + log.error(f"Apache RAT JAR not found at: {rat_jar_path}") + # Try a few common locations: + # ./rat.jar + # ./state/rat.jar + # ../rat.jar + # ../state/rat.jar + # NOTE: We're also doing something like this in task_verify_rat_license + # Should probably decide one place to do it, and do it well + alternative_paths = [ + os.path.join(os.getcwd(), os.path.basename(rat_jar_path)), + os.path.join(os.getcwd(), "state", os.path.basename(rat_jar_path)), + os.path.join(os.path.dirname(os.getcwd()), os.path.basename(rat_jar_path)), + os.path.join(os.path.dirname(os.getcwd()), "state", os.path.basename(rat_jar_path)), + ] + + for alt_path in alternative_paths: + if os.path.exists(alt_path): + log.info(f"Found alternative RAT JAR at: {alt_path}") + rat_jar_path = alt_path + break + + # Double check whether we found the JAR + if not os.path.exists(rat_jar_path): + log.error("Tried alternative paths but Apache RAT JAR still not found") + log.error(f"Current directory: {os.getcwd()}") + log.error(f"Directory contents: {os.listdir(os.getcwd())}") + if os.path.exists("state"): + log.error(f"State directory contents: {os.listdir('state')}") + + return rat_jar_path, checkdata.Rat( + message=f"Apache RAT JAR not found at: {rat_jar_path}", + errors=[f"Missing JAR: {rat_jar_path}"], + ) + else: + log.info(f"Found Apache RAT JAR at: {rat_jar_path}") + + return rat_jar_path, None + + +def _synchronous_check_java_installed() -> checkdata.Rat | None: + # Check that Java is installed + # TODO: Run this only once, when the server starts + try: + java_version = subprocess.check_output( + ["java", *_JAVA_MEMORY_ARGS, "-version"], stderr=subprocess.STDOUT, text=True + ) + log.info(f"Java version: {java_version.splitlines()[0]}") + except (subprocess.SubprocessError, FileNotFoundError) as e: + log.error(f"Java is not properly installed or not in PATH: {e}") + + # Try to get some output even if the command failed + try: + # Use run instead of check_output to avoid exceptions + java_result = subprocess.run( + ["java", *_JAVA_MEMORY_ARGS, "-version"], + stderr=subprocess.STDOUT, + stdout=subprocess.PIPE, + text=True, + check=False, + ) + log.info(f"Java command return code: {java_result.returncode}") + log.info(f"Java command output: {java_result.stdout or java_result.stderr}") + + # Try to find where Java might be located + which_java = subprocess.run(["which", "java"], capture_output=True, text=True, check=False) + which_java_result = which_java.stdout.strip() if (which_java.returncode == 0) else "not found" + log.info(f"Result for which java: {which_java_result}") + except Exception as inner_e: + log.error(f"Additional error while trying to debug java: {inner_e}") + + return checkdata.Rat( + message="Java is not properly installed or not in PATH", + errors=[f"Java error: {e}"], + ) + + +def _synchronous_extract( + artifact_path: str, + temp_dir: str, + max_extract_size: int, + chunk_size: int, + policy_excludes: list[str], + rat_jar_path: str, +) -> checkdata.Rat: + # Extract the archive to the temporary directory + log.info(f"Extracting {artifact_path} to {temp_dir}") + extracted_size, exclude_file_paths = archives.extract( + artifact_path, + temp_dir, + max_size=max_extract_size, + chunk_size=chunk_size, + track_files={_RAT_EXCLUDES_FILENAME}, + ) + log.info(f"Extracted {extracted_size} bytes") + log.info(f"Found {len(exclude_file_paths)} {_RAT_EXCLUDES_FILENAME} file(s): {exclude_file_paths}") + + # Validate that we found at most one exclusion file + if len(exclude_file_paths) > 1: + log.error(f"Multiple {_RAT_EXCLUDES_FILENAME} files found: {exclude_file_paths}") + return checkdata.Rat( + message=f"Multiple {_RAT_EXCLUDES_FILENAME} files not allowed (found {len(exclude_file_paths)})", + errors=[f"Found {len(exclude_file_paths)} {_RAT_EXCLUDES_FILENAME} files"], + ) + + # Narrow to single path after validation + archive_excludes_path: str | None = exclude_file_paths[0] if exclude_file_paths else None + + excludes_source, effective_excludes_path = _synchronous_extract_excludes_source( + archive_excludes_path, policy_excludes, temp_dir + ) + + try: + scan_root = _synchronous_extract_scan_root(archive_excludes_path, temp_dir) + except RatError as e: + return checkdata.Rat( + message=f"Failed to determine scan root: {e}", + errors=[str(e)], + excludes_source=excludes_source, + ) + + # Execute RAT and get results or error + # Extended std exclusions apply when there's no archive .rat-excludes + apply_extended_std = excludes_source != "archive" + error_result, xml_output_path = _check_core_logic_execute_rat( + rat_jar_path, scan_root, temp_dir, effective_excludes_path, apply_extended_std, excludes_source + ) + if error_result is not None: + error_result.excludes_source = excludes_source + error_result.extended_std_applied = apply_extended_std + return error_result + + # Parse the XML output + log.info(f"Parsing RAT XML output: {xml_output_path}") + # Make sure xml_output_path is not None before parsing + if xml_output_path is None: + raise ValueError("XML output path is None") + + result = _synchronous_extract_parse_output(xml_output_path, scan_root) + log.info(f"Successfully parsed RAT output with {util.plural(result.total_files, 'file')}") + + # The unknown_license_files and unapproved_files contain FileEntry objects + # The path is relative to scan_root, so we prepend the scan_root relative path + scan_root_rel = os.path.relpath(scan_root, temp_dir) + if scan_root_rel != ".": + for file in result.unknown_license_files: + file.name = os.path.join(scan_root_rel, os.path.normpath(file.name)) + for file in result.unapproved_files: + file.name = os.path.join(scan_root_rel, os.path.normpath(file.name)) + + result.excludes_source = excludes_source + result.extended_std_applied = apply_extended_std + return result + + +def _synchronous_extract_excludes_source( + archive_excludes_path: str | None, policy_excludes: list[str], temp_dir: str +) -> tuple[str, str | None]: + # Determine excludes_source and effective excludes file + excludes_source: str + effective_excludes_path: str | None + + if archive_excludes_path is not None: + excludes_source = "archive" + effective_excludes_path = archive_excludes_path + log.info(f"Using archive {_RAT_EXCLUDES_FILENAME}: {archive_excludes_path}") + elif policy_excludes: + excludes_source = "policy" + policy_excludes_file = os.path.join(temp_dir, _POLICY_EXCLUDES_FILENAME) + with open(policy_excludes_file, "w") as f: + f.write("\n".join(policy_excludes)) + effective_excludes_path = os.path.relpath(policy_excludes_file, temp_dir) + log.info(f"Using policy excludes written to: {policy_excludes_file}") + else: + excludes_source = "none" + effective_excludes_path = None + log.info("No excludes: using defaults only") + return excludes_source, effective_excludes_path + + +def _synchronous_extract_scan_root(archive_excludes_path: str | None, temp_dir: str) -> str: + # Determine scan root based on archive .rat-excludes location + if archive_excludes_path is not None: + scan_root = os.path.dirname(os.path.join(temp_dir, archive_excludes_path)) + + # Verify that scan_root is inside temp_dir + abs_scan_root = os.path.abspath(scan_root) + abs_temp_dir = os.path.abspath(temp_dir) + scan_root_is_inside = (abs_scan_root == abs_temp_dir) or abs_scan_root.startswith(abs_temp_dir + os.sep) + if not scan_root_is_inside: + log.error(f"Scan root {scan_root} is outside temp_dir {temp_dir}") + raise RatError("Invalid archive structure: exclusion file path escapes extraction directory") + + log.info(f"Using {_RAT_EXCLUDES_FILENAME} directory as scan root: {scan_root}") + + untracked_count = _count_files_outside_directory(temp_dir, scan_root) + if untracked_count > 0: + log.error(f"Found {untracked_count} file(s) outside {_RAT_EXCLUDES_FILENAME} directory") + raise RatError(f"Files exist outside {_RAT_EXCLUDES_FILENAME} directory ({untracked_count} found)") + else: + scan_root = temp_dir + log.info(f"No archive {_RAT_EXCLUDES_FILENAME} found, using temp_dir as scan root: {scan_root}") + + return scan_root diff --git a/tests/unit/test_checks_rat.py b/tests/unit/test_checks_rat.py index b47cf94..ad9f173 100644 --- a/tests/unit/test_checks_rat.py +++ b/tests/unit/test_checks_rat.py @@ -27,27 +27,27 @@ TEST_ARCHIVE = pathlib.Path(__file__).parent.parent / "e2e" / "test_files" / "ap @pytest.fixture def rat_available() -> tuple[bool, bool]: # TODO: Make this work properly in CI - java_ok = rat._check_java_installed() is None - _, jar_error = rat._check_core_logic_jar_exists(rat._CONFIG.APACHE_RAT_JAR_PATH) + java_ok = rat._synchronous_check_java_installed() is None + _, jar_error = rat._synchronous_check_jar_exists(rat._CONFIG.APACHE_RAT_JAR_PATH) jar_ok = jar_error is None return (java_ok, jar_ok) -def _skip_if_unavailable(rat_available: tuple[bool, bool]) -> None: - java_ok, jar_ok = rat_available - if not java_ok: - pytest.skip("Java not available") - if not jar_ok: - pytest.skip("RAT JAR not available") - - def test_check_includes_excludes_source_none(rat_available: tuple[bool, bool]): _skip_if_unavailable(rat_available) - result = rat._check_core_logic(str(TEST_ARCHIVE), []) + result = rat._synchronous(str(TEST_ARCHIVE), []) assert result.excludes_source == "none" def test_check_includes_excludes_source_policy(rat_available: tuple[bool, bool]): _skip_if_unavailable(rat_available) - result = rat._check_core_logic(str(TEST_ARCHIVE), ["*.py"]) + result = rat._synchronous(str(TEST_ARCHIVE), ["*.py"]) assert result.excludes_source == "policy" + + +def _skip_if_unavailable(rat_available: tuple[bool, bool]) -> None: + java_ok, jar_ok = rat_available + if not java_ok: + pytest.skip("Java not available") + if not jar_ok: + pytest.skip("RAT JAR not available") --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
