This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/sbp by this push:
     new 5b9e317  Change how RAT checks are applied
5b9e317 is described below

commit 5b9e3179a1a6522f4a42f63e1bc99f12be24d1e4
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Jan 5 14:50:51 2026 +0000

    Change how RAT checks are applied
---
 atr/archives.py              |  31 +++--
 atr/tasks/checks/__init__.py |   4 +
 atr/tasks/checks/rat.py      | 296 +++++++++++++++++++++++++++++--------------
 3 files changed, 223 insertions(+), 108 deletions(-)

diff --git a/atr/archives.py b/atr/archives.py
index b3bcbf4..2560c95 100644
--- a/atr/archives.py
+++ b/atr/archives.py
@@ -82,7 +82,7 @@ def total_size(tgz_path: str, chunk_size: int = 4096) -> int:
     return total_size
 
 
-def _archive_extract_member(
+def _archive_extract_member(  # noqa: C901
     tf: tarfile.TarFile,
     member: tarfile.TarInfo,
     extract_dir: str,
@@ -101,8 +101,10 @@ def _archive_extract_member(
     if member.isdev():
         return 0, extracted_paths
 
+    # Only track files that pass the path safety check
     if track_files and isinstance(track_files, set) and (member_basename in 
track_files):
-        extracted_paths.append(member.name)
+        if _safe_path(extract_dir, member.name) is not None:
+            extracted_paths.append(member.name)
 
     # Check whether extraction would exceed the size limit
     if member.isreg() and ((total_extracted + member.size) > max_size):
@@ -114,8 +116,7 @@ def _archive_extract_member(
     # Extract directories directly
     if member.isdir():
         # Ensure the path is safe before extracting
-        target_path = os.path.join(extract_dir, member.name)
-        if not 
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+        if _safe_path(extract_dir, member.name) is None:
             log.warning(f"Skipping potentially unsafe path: {member.name}")
             return 0, extracted_paths
         tf.extract(member, extract_dir, numeric_owner=True)
@@ -144,8 +145,8 @@ def _archive_extract_safe_process_file(
     chunk_size: int,
 ) -> int:
     """Process a single file member during safe archive extraction."""
-    target_path = os.path.join(extract_dir, member.name)
-    if not 
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+    target_path = _safe_path(extract_dir, member.name)
+    if target_path is None:
         log.warning(f"Skipping potentially unsafe path: {member.name}")
         return 0
 
@@ -235,7 +236,8 @@ def _archive_extract_safe_process_symlink(member: 
tarfile.TarInfo, extract_dir:
 def _safe_path(base_dir: str, *paths: str) -> str | None:
     """Return an absolute path within the base_dir built from the given paths, 
or None if it escapes."""
     target = os.path.abspath(os.path.join(base_dir, *paths))
-    if target.startswith(os.path.abspath(base_dir)):
+    abs_base = os.path.abspath(base_dir)
+    if (target == abs_base) or target.startswith(abs_base + os.sep):
         return target
     return None
 
@@ -277,8 +279,11 @@ def _zip_archive_extract_member(
     extracted_paths: list[str] = [],
 ) -> tuple[int, list[str]]:
     member_basename = os.path.basename(member.name)
-    if track_files and (isinstance(track_files, set) and (member_basename in 
track_files)):
-        extracted_paths.append(member.name)
+
+    # Only track files that pass the path safety check
+    if track_files and isinstance(track_files, set) and (member_basename in 
track_files):
+        if _safe_path(extract_dir, member.name) is not None:
+            extracted_paths.append(member.name)
 
     if member_basename.startswith("._"):
         return 0, extracted_paths
@@ -290,8 +295,8 @@ def _zip_archive_extract_member(
         )
 
     if member.isdir():
-        target_path = os.path.join(extract_dir, member.name)
-        if not 
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+        target_path = _safe_path(extract_dir, member.name)
+        if target_path is None:
             log.warning(f"Skipping potentially unsafe path: {member.name}")
             return 0, extracted_paths
         os.makedirs(target_path, exist_ok=True)
@@ -314,8 +319,8 @@ def _zip_extract_safe_process_file(
     max_size: int,
     chunk_size: int,
 ) -> int:
-    target_path = os.path.join(extract_dir, member.name)
-    if not 
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+    target_path = _safe_path(extract_dir, member.name)
+    if target_path is None:
         log.warning(f"Skipping potentially unsafe path: {member.name}")
         return 0
 
diff --git a/atr/tasks/checks/__init__.py b/atr/tasks/checks/__init__.py
index 09a15d3..6bcfb36 100644
--- a/atr/tasks/checks/__init__.py
+++ b/atr/tasks/checks/__init__.py
@@ -204,6 +204,10 @@ class Recorder:
         if not await aiofiles.os.path.isfile(path):
             return False
 
+        no_cache_file = self.abs_path_base() / ".atr-no-cache"
+        if await aiofiles.os.path.exists(no_cache_file):
+            return False
+
         self.__input_hash = await _compute_file_hash(path)
 
         async with db.session() as data:
diff --git a/atr/tasks/checks/rat.py b/atr/tasks/checks/rat.py
index 65840c8..0ca3d04 100644
--- a/atr/tasks/checks/rat.py
+++ b/atr/tasks/checks/rat.py
@@ -42,7 +42,35 @@ _JAVA_MEMORY_ARGS: Final[list[str]] = []
 #     "-XX:MaxRAM=256m",
 #     "-XX:CompressedClassSpaceSize=16m"
 # ]
-_RAT_EXCLUDES_FILENAMES: Final[set[str]] = {".rat-excludes", 
"rat-excludes.txt"}
+
+# Generated file patterns, always excluded
+_GENERATED_FILE_PATTERNS: Final[list[str]] = [
+    "**/*.bundle.js",
+    "**/*.chunk.js",
+    "**/*.css.map",
+    "**/*.js.map",
+    "**/*.min.css",
+    "**/*.min.js",
+    "**/*.min.map",
+]
+
+# The name of the file that contains the exclusions for the specified archive
+_RAT_EXCLUDES_FILENAME: Final[str] = ".rat-excludes"
+
+# The name of the RAT report file
+_RAT_REPORT_FILENAME: Final[str] = "rat-report.xml"
+
+# Standard exclusions, always applied explicitly
+_STD_EXCLUSIONS_ALWAYS: Final[list[str]] = ["MISC", "HIDDEN_DIR", "MAC"]
+
+# Additional exclusions when no project exclusion file is found
+_STD_EXCLUSIONS_EXTENDED: Final[list[str]] = [
+    "MAVEN",
+    "ECLIPSE",
+    "IDEA",
+    "GIT",
+    "STANDARD_SCMS",
+]
 
 
 async def check(args: checks.FunctionArguments) -> results.Results | None:
@@ -74,6 +102,50 @@ async def check(args: checks.FunctionArguments) -> 
results.Results | None:
     return None
 
 
+def _build_rat_command(
+    rat_jar_path: str,
+    xml_output_path: str,
+    exclude_file: str | None,
+) -> list[str]:
+    """Build the RAT command with appropriate exclusions."""
+    command = [
+        "java",
+        *_JAVA_MEMORY_ARGS,
+        "-jar",
+        rat_jar_path,
+        "--output-style",
+        "xml",
+        "--output-file",
+        xml_output_path,
+        "--counter-max",
+        "UNAPPROVED:-1",
+        "--counter-min",
+        "LICENSE_CATEGORIES:0",
+        "LICENSE_NAMES:0",
+        "STANDARDS:0",
+    ]
+
+    for std in _STD_EXCLUSIONS_ALWAYS:
+        command.extend(["--input-exclude-std", std])
+
+    if exclude_file is None:
+        for std in _STD_EXCLUSIONS_EXTENDED:
+            command.extend(["--input-exclude-std", std])
+
+    for pattern in _GENERATED_FILE_PATTERNS:
+        command.extend(["--input-exclude", pattern])
+
+    command.extend(["--input-exclude", _RAT_REPORT_FILENAME])
+
+    if exclude_file is not None:
+        command.extend(["--input-exclude", _RAT_EXCLUDES_FILENAME])
+        command.extend(["--input-exclude-file", exclude_file])
+
+    command.extend(["--", "."])
+
+    return command
+
+
 async def _check_core(
     args: checks.FunctionArguments, recorder: checks.Recorder, 
artifact_abs_path: pathlib.Path
 ) -> None:
@@ -120,7 +192,7 @@ async def _check_core(
         await recorder.success(result_data["message"], result_data)
 
 
-def _check_core_logic(
+def _check_core_logic(  # noqa: C901
     artifact_path: str,
     rat_jar_path: str = _CONFIG.APACHE_RAT_JAR_PATH,
     max_extract_size: int = _CONFIG.MAX_EXTRACT_SIZE,
@@ -168,29 +240,79 @@ def _check_core_logic(
 
             # Extract the archive to the temporary directory
             log.info(f"Extracting {artifact_path} to {temp_dir}")
-            extracted_size, extracted_paths = archives.extract(
+            extracted_size, exclude_file_paths = archives.extract(
                 artifact_path,
                 temp_dir,
                 max_size=max_extract_size,
                 chunk_size=chunk_size,
-                track_files=_RAT_EXCLUDES_FILENAMES,
+                track_files={_RAT_EXCLUDES_FILENAME},
             )
             log.info(f"Extracted {extracted_size} bytes")
+            log.info(f"Found {len(exclude_file_paths)} 
{_RAT_EXCLUDES_FILENAME} file(s): {exclude_file_paths}")
 
-            # Find the root directory
-            if (extract_dir := _extracted_dir(temp_dir)) is None:
-                log.error("No root directory found in archive")
+            # Validate that we found at most one exclusion file
+            if len(exclude_file_paths) > 1:
+                log.error(f"Multiple {_RAT_EXCLUDES_FILENAME} files found: 
{exclude_file_paths}")
                 return {
                     "valid": False,
-                    "message": "No root directory found in archive",
-                    "errors": [],
+                    "message": f"Multiple {_RAT_EXCLUDES_FILENAME} files not 
allowed (found {len(exclude_file_paths)})",
+                    "total_files": 0,
+                    "approved_licenses": 0,
+                    "unapproved_licenses": 0,
+                    "unknown_licenses": 0,
+                    "unapproved_files": [],
+                    "unknown_license_files": [],
+                    "errors": [f"Found {len(exclude_file_paths)} 
{_RAT_EXCLUDES_FILENAME} files"],
                 }
 
-            log.info(f"Using root directory: {extract_dir}")
+            # Narrow to single path after validation
+            exclude_file_path: str | None = exclude_file_paths[0] if 
exclude_file_paths else None
+
+            # Determine scan root
+            if exclude_file_path is not None:
+                scan_root = os.path.dirname(os.path.join(temp_dir, 
exclude_file_path))
+
+                # Verify that scan_root is inside temp_dir
+                abs_scan_root = os.path.abspath(scan_root)
+                abs_temp_dir = os.path.abspath(temp_dir)
+                scan_root_is_inside = (abs_scan_root == abs_temp_dir) or 
abs_scan_root.startswith(abs_temp_dir + os.sep)
+                if not scan_root_is_inside:
+                    log.error(f"Scan root {scan_root} is outside temp_dir 
{temp_dir}")
+                    return {
+                        "valid": False,
+                        "message": "Invalid archive structure: exclusion file 
path escapes extraction directory",
+                        "total_files": 0,
+                        "approved_licenses": 0,
+                        "unapproved_licenses": 0,
+                        "unknown_licenses": 0,
+                        "unapproved_files": [],
+                        "unknown_license_files": [],
+                        "errors": ["Exclusion file path escapes extraction 
directory"],
+                    }
+
+                log.info(f"Using {_RAT_EXCLUDES_FILENAME} directory as scan 
root: {scan_root}")
+
+                untracked_count = _count_files_outside_directory(temp_dir, 
scan_root)
+                if untracked_count > 0:
+                    log.error(f"Found {untracked_count} file(s) outside 
{_RAT_EXCLUDES_FILENAME} directory")
+                    return {
+                        "valid": False,
+                        "message": f"Files exist outside 
{_RAT_EXCLUDES_FILENAME} directory ({untracked_count} found)",
+                        "total_files": 0,
+                        "approved_licenses": 0,
+                        "unapproved_licenses": 0,
+                        "unknown_licenses": 0,
+                        "unapproved_files": [],
+                        "unknown_license_files": [],
+                        "errors": [f"{untracked_count} file(s) outside 
{_RAT_EXCLUDES_FILENAME} directory"],
+                    }
+            else:
+                scan_root = temp_dir
+                log.info(f"No {_RAT_EXCLUDES_FILENAME} found, using temp_dir 
as scan root: {scan_root}")
 
             # Execute RAT and get results or error
             error_result, xml_output_path = _check_core_logic_execute_rat(
-                rat_jar_path, extract_dir, temp_dir, extracted_paths
+                rat_jar_path, scan_root, temp_dir, exclude_file_path
             )
             if error_result:
                 return error_result
@@ -201,18 +323,24 @@ def _check_core_logic(
             if xml_output_path is None:
                 raise ValueError("XML output path is None")
 
-            results = _check_core_logic_parse_output(xml_output_path, 
extract_dir)
+            results = _check_core_logic_parse_output(xml_output_path, 
scan_root)
             log.info(f"Successfully parsed RAT output with 
{util.plural(results.get('total_files', 0), 'file')}")
 
-            # The unknown_license_files key may contain a list of dicts
+            # The unknown_license_files and unapproved_files keys contain 
lists of dicts
             # {"name": "./README.md", "license": "Unknown license"}
-            # The path is missing the root of the archive, so we add it
-            extract_dir_basename = os.path.basename(extract_dir)
-            for file in results["unknown_license_files"]:
-                file["name"] = os.path.join(
-                    extract_dir_basename,
-                    os.path.normpath(file["name"]),
-                )
+            # The path is relative to scan_root, so we prepend the scan_root 
relative path
+            scan_root_rel = os.path.relpath(scan_root, temp_dir)
+            if scan_root_rel != ".":
+                for file in results["unknown_license_files"]:
+                    file["name"] = os.path.join(
+                        scan_root_rel,
+                        os.path.normpath(file["name"]),
+                    )
+                for file in results["unapproved_files"]:
+                    file["name"] = os.path.join(
+                        scan_root_rel,
+                        os.path.normpath(file["name"]),
+                    )
 
             return results
 
@@ -234,42 +362,37 @@ def _check_core_logic(
 
 
 def _check_core_logic_execute_rat(
-    rat_jar_path: str, extract_dir: str, temp_dir: str, excluded_paths: 
list[str]
+    rat_jar_path: str, scan_root: str, temp_dir: str, exclude_file_path: str | 
None
 ) -> tuple[dict[str, Any] | None, str | None]:
     """Execute Apache RAT and process its output."""
-    # Define output file path
-    xml_output_path = os.path.join(temp_dir, "rat-report.xml")
+    xml_output_path = os.path.join(temp_dir, _RAT_REPORT_FILENAME)
     log.info(f"XML output will be written to: {xml_output_path}")
 
-    # Run Apache RAT on the extracted directory
-    # TODO: From RAT 0.17, --exclude will become --input-exclude
-    # TODO: Check whether --exclude NAME works on inner files
-    # (Note that we presently use _rat_apply_exclusions to apply exclusions 
instead)
-    command = [
-        "java",
-        *_JAVA_MEMORY_ARGS,
-        "-jar",
-        rat_jar_path,
-        "--output-style",
-        "xml",
-        "--output-file",
-        xml_output_path,
-        "--counter-max",
-        "UNAPPROVED:-1",
-        "--counter-min",
-        "LICENSE_CATEGORIES:0",
-        "LICENSE_NAMES:0",
-        "STANDARDS:0",
-        "--",
-        ".",
-    ]
-    if excluded_paths:
-        _rat_apply_exclusions(extract_dir, excluded_paths, temp_dir)
+    # Convert exclusion file path from temp_dir relative to scan_root relative
+    exclude_file: str | None = None
+    if exclude_file_path is not None:
+        abs_path = os.path.join(temp_dir, exclude_file_path)
+        if not (os.path.exists(abs_path) and os.path.isfile(abs_path)):
+            log.error(f"Exclusion file not found or not a regular file: 
{abs_path}")
+            return {
+                "valid": False,
+                "message": f"Exclusion file is not a regular file: 
{exclude_file_path}",
+                "total_files": 0,
+                "approved_licenses": 0,
+                "unapproved_licenses": 0,
+                "unknown_licenses": 0,
+                "unapproved_files": [],
+                "unknown_license_files": [],
+                "errors": [f"Expected exclusion file but found: {abs_path}"],
+            }, None
+        exclude_file = os.path.relpath(abs_path, scan_root)
+        log.info(f"Using exclusion file: {exclude_file}")
+    command = _build_rat_command(rat_jar_path, xml_output_path, exclude_file)
     log.info(f"Running Apache RAT: {' '.join(command)}")
 
-    # Change working directory to extract_dir when running the process
+    # Change working directory to scan_root when running the process
     current_dir = os.getcwd()
-    os.chdir(extract_dir)
+    os.chdir(scan_root)
 
     log.info(f"Executing Apache RAT from directory: {os.getcwd()}")
 
@@ -549,53 +672,36 @@ def _check_java_installed() -> dict[str, Any] | None:
         }
 
 
-def _extracted_dir(temp_dir: str) -> str | None:
-    # Loop through all the dirs in temp_dir
-    extract_dir = None
-    log.info(f"Checking directories in {temp_dir}: {os.listdir(temp_dir)}")
-    for dir_name in os.listdir(temp_dir):
-        if dir_name.startswith("."):
-            continue
-        dir_path = os.path.join(temp_dir, dir_name)
-        if not os.path.isdir(dir_path):
-            raise ValueError(f"Unknown file type found in temporary directory: 
{dir_path}")
-        if extract_dir is None:
-            extract_dir = dir_path
-        else:
-            raise ValueError(f"Multiple root directories found: {extract_dir}, 
{dir_path}")
-    return extract_dir
-
-
-def _rat_apply_exclusions(extract_dir: str, excluded_paths: list[str], 
temp_dir: str) -> None:
-    """Apply exclusions to the extracted directory."""
-    # Exclusions are difficult using the command line version of RAT
-    # Each line is interpreted as a literal AND a glob AND a regex
-    # Then, if ANY of those three match a filename, the file is excluded
-    # You cannot specify which syntax to use; all three are always tried
-    # You cannot specify that you want to match against the whole path
-    # Therefore, we take a different approach
-    # We interpret the exclusion file as a glob file in .gitignore format
-    # Then, we simply remove any files that match the glob
-    exclusion_lines = []
-    for excluded_path in excluded_paths:
-        abs_excluded_path = os.path.join(temp_dir, excluded_path)
-        if not os.path.exists(abs_excluded_path):
-            log.error(f"Exclusion file not found: {abs_excluded_path}")
-            continue
-        if not os.path.isfile(abs_excluded_path):
-            log.error(f"Exclusion file is not a file: {abs_excluded_path}")
+def _count_files_outside_directory(temp_dir: str, scan_root: str) -> int:
+    """Count regular files that exist outside the scan_root directory."""
+    count = 0
+    scan_root_rel = os.path.relpath(scan_root, temp_dir)
+    if scan_root_rel == ".":
+        scan_root_rel = ""
+
+    for root, _dirs, files in os.walk(temp_dir):
+        rel_root = os.path.relpath(root, temp_dir)
+        if rel_root == ".":
+            rel_root = ""
+
+        if _is_inside_directory(rel_root, scan_root_rel):
             continue
-        with open(abs_excluded_path, encoding="utf-8") as f:
-            exclusion_lines.extend(f.readlines())
-    matcher = util.create_path_matcher(
-        exclusion_lines, pathlib.Path(extract_dir) / ".ignore", 
pathlib.Path(extract_dir)
-    )
-    for root, _dirs, files in os.walk(extract_dir):
-        for file in files:
-            abs_path = os.path.join(root, file)
-            if matcher(abs_path):
-                log.info(f"Removing {abs_path} because it matches the 
exclusion")
-                os.remove(abs_path)
+
+        # for filename in files:
+        #     if not filename.startswith("."):
+        #         count += 1
+        count += len(files)
+
+    return count
+
+
+def _is_inside_directory(path: str, directory: str) -> bool:
+    """Check whether path is inside directory, or is the directory itself."""
+    if directory == "":
+        return True
+    if path == directory:
+        return True
+    return path.startswith(directory + os.sep)
 
 
 def _summary_message(valid: bool, unapproved_licenses: int, unknown_licenses: 
int) -> str:


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to