This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/sbp by this push:
new 5b9e317 Change how RAT checks are applied
5b9e317 is described below
commit 5b9e3179a1a6522f4a42f63e1bc99f12be24d1e4
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Jan 5 14:50:51 2026 +0000
Change how RAT checks are applied
---
atr/archives.py | 31 +++--
atr/tasks/checks/__init__.py | 4 +
atr/tasks/checks/rat.py | 296 +++++++++++++++++++++++++++++--------------
3 files changed, 223 insertions(+), 108 deletions(-)
diff --git a/atr/archives.py b/atr/archives.py
index b3bcbf4..2560c95 100644
--- a/atr/archives.py
+++ b/atr/archives.py
@@ -82,7 +82,7 @@ def total_size(tgz_path: str, chunk_size: int = 4096) -> int:
return total_size
-def _archive_extract_member(
+def _archive_extract_member( # noqa: C901
tf: tarfile.TarFile,
member: tarfile.TarInfo,
extract_dir: str,
@@ -101,8 +101,10 @@ def _archive_extract_member(
if member.isdev():
return 0, extracted_paths
+ # Only track files that pass the path safety check
if track_files and isinstance(track_files, set) and (member_basename in
track_files):
- extracted_paths.append(member.name)
+ if _safe_path(extract_dir, member.name) is not None:
+ extracted_paths.append(member.name)
# Check whether extraction would exceed the size limit
if member.isreg() and ((total_extracted + member.size) > max_size):
@@ -114,8 +116,7 @@ def _archive_extract_member(
# Extract directories directly
if member.isdir():
# Ensure the path is safe before extracting
- target_path = os.path.join(extract_dir, member.name)
- if not
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+ if _safe_path(extract_dir, member.name) is None:
log.warning(f"Skipping potentially unsafe path: {member.name}")
return 0, extracted_paths
tf.extract(member, extract_dir, numeric_owner=True)
@@ -144,8 +145,8 @@ def _archive_extract_safe_process_file(
chunk_size: int,
) -> int:
"""Process a single file member during safe archive extraction."""
- target_path = os.path.join(extract_dir, member.name)
- if not
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+ target_path = _safe_path(extract_dir, member.name)
+ if target_path is None:
log.warning(f"Skipping potentially unsafe path: {member.name}")
return 0
@@ -235,7 +236,8 @@ def _archive_extract_safe_process_symlink(member:
tarfile.TarInfo, extract_dir:
def _safe_path(base_dir: str, *paths: str) -> str | None:
"""Return an absolute path within the base_dir built from the given paths,
or None if it escapes."""
target = os.path.abspath(os.path.join(base_dir, *paths))
- if target.startswith(os.path.abspath(base_dir)):
+ abs_base = os.path.abspath(base_dir)
+ if (target == abs_base) or target.startswith(abs_base + os.sep):
return target
return None
@@ -277,8 +279,11 @@ def _zip_archive_extract_member(
extracted_paths: list[str] = [],
) -> tuple[int, list[str]]:
member_basename = os.path.basename(member.name)
- if track_files and (isinstance(track_files, set) and (member_basename in
track_files)):
- extracted_paths.append(member.name)
+
+ # Only track files that pass the path safety check
+ if track_files and isinstance(track_files, set) and (member_basename in
track_files):
+ if _safe_path(extract_dir, member.name) is not None:
+ extracted_paths.append(member.name)
if member_basename.startswith("._"):
return 0, extracted_paths
@@ -290,8 +295,8 @@ def _zip_archive_extract_member(
)
if member.isdir():
- target_path = os.path.join(extract_dir, member.name)
- if not
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+ target_path = _safe_path(extract_dir, member.name)
+ if target_path is None:
log.warning(f"Skipping potentially unsafe path: {member.name}")
return 0, extracted_paths
os.makedirs(target_path, exist_ok=True)
@@ -314,8 +319,8 @@ def _zip_extract_safe_process_file(
max_size: int,
chunk_size: int,
) -> int:
- target_path = os.path.join(extract_dir, member.name)
- if not
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+ target_path = _safe_path(extract_dir, member.name)
+ if target_path is None:
log.warning(f"Skipping potentially unsafe path: {member.name}")
return 0
diff --git a/atr/tasks/checks/__init__.py b/atr/tasks/checks/__init__.py
index 09a15d3..6bcfb36 100644
--- a/atr/tasks/checks/__init__.py
+++ b/atr/tasks/checks/__init__.py
@@ -204,6 +204,10 @@ class Recorder:
if not await aiofiles.os.path.isfile(path):
return False
+ no_cache_file = self.abs_path_base() / ".atr-no-cache"
+ if await aiofiles.os.path.exists(no_cache_file):
+ return False
+
self.__input_hash = await _compute_file_hash(path)
async with db.session() as data:
diff --git a/atr/tasks/checks/rat.py b/atr/tasks/checks/rat.py
index 65840c8..0ca3d04 100644
--- a/atr/tasks/checks/rat.py
+++ b/atr/tasks/checks/rat.py
@@ -42,7 +42,35 @@ _JAVA_MEMORY_ARGS: Final[list[str]] = []
# "-XX:MaxRAM=256m",
# "-XX:CompressedClassSpaceSize=16m"
# ]
-_RAT_EXCLUDES_FILENAMES: Final[set[str]] = {".rat-excludes",
"rat-excludes.txt"}
+
+# Generated file patterns, always excluded
+_GENERATED_FILE_PATTERNS: Final[list[str]] = [
+ "**/*.bundle.js",
+ "**/*.chunk.js",
+ "**/*.css.map",
+ "**/*.js.map",
+ "**/*.min.css",
+ "**/*.min.js",
+ "**/*.min.map",
+]
+
+# The name of the file that contains the exclusions for the specified archive
+_RAT_EXCLUDES_FILENAME: Final[str] = ".rat-excludes"
+
+# The name of the RAT report file
+_RAT_REPORT_FILENAME: Final[str] = "rat-report.xml"
+
+# Standard exclusions, always applied explicitly
+_STD_EXCLUSIONS_ALWAYS: Final[list[str]] = ["MISC", "HIDDEN_DIR", "MAC"]
+
+# Additional exclusions when no project exclusion file is found
+_STD_EXCLUSIONS_EXTENDED: Final[list[str]] = [
+ "MAVEN",
+ "ECLIPSE",
+ "IDEA",
+ "GIT",
+ "STANDARD_SCMS",
+]
async def check(args: checks.FunctionArguments) -> results.Results | None:
@@ -74,6 +102,50 @@ async def check(args: checks.FunctionArguments) ->
results.Results | None:
return None
+def _build_rat_command(
+ rat_jar_path: str,
+ xml_output_path: str,
+ exclude_file: str | None,
+) -> list[str]:
+ """Build the RAT command with appropriate exclusions."""
+ command = [
+ "java",
+ *_JAVA_MEMORY_ARGS,
+ "-jar",
+ rat_jar_path,
+ "--output-style",
+ "xml",
+ "--output-file",
+ xml_output_path,
+ "--counter-max",
+ "UNAPPROVED:-1",
+ "--counter-min",
+ "LICENSE_CATEGORIES:0",
+ "LICENSE_NAMES:0",
+ "STANDARDS:0",
+ ]
+
+ for std in _STD_EXCLUSIONS_ALWAYS:
+ command.extend(["--input-exclude-std", std])
+
+ if exclude_file is None:
+ for std in _STD_EXCLUSIONS_EXTENDED:
+ command.extend(["--input-exclude-std", std])
+
+ for pattern in _GENERATED_FILE_PATTERNS:
+ command.extend(["--input-exclude", pattern])
+
+ command.extend(["--input-exclude", _RAT_REPORT_FILENAME])
+
+ if exclude_file is not None:
+ command.extend(["--input-exclude", _RAT_EXCLUDES_FILENAME])
+ command.extend(["--input-exclude-file", exclude_file])
+
+ command.extend(["--", "."])
+
+ return command
+
+
async def _check_core(
args: checks.FunctionArguments, recorder: checks.Recorder,
artifact_abs_path: pathlib.Path
) -> None:
@@ -120,7 +192,7 @@ async def _check_core(
await recorder.success(result_data["message"], result_data)
-def _check_core_logic(
+def _check_core_logic( # noqa: C901
artifact_path: str,
rat_jar_path: str = _CONFIG.APACHE_RAT_JAR_PATH,
max_extract_size: int = _CONFIG.MAX_EXTRACT_SIZE,
@@ -168,29 +240,79 @@ def _check_core_logic(
# Extract the archive to the temporary directory
log.info(f"Extracting {artifact_path} to {temp_dir}")
- extracted_size, extracted_paths = archives.extract(
+ extracted_size, exclude_file_paths = archives.extract(
artifact_path,
temp_dir,
max_size=max_extract_size,
chunk_size=chunk_size,
- track_files=_RAT_EXCLUDES_FILENAMES,
+ track_files={_RAT_EXCLUDES_FILENAME},
)
log.info(f"Extracted {extracted_size} bytes")
+ log.info(f"Found {len(exclude_file_paths)}
{_RAT_EXCLUDES_FILENAME} file(s): {exclude_file_paths}")
- # Find the root directory
- if (extract_dir := _extracted_dir(temp_dir)) is None:
- log.error("No root directory found in archive")
+ # Validate that we found at most one exclusion file
+ if len(exclude_file_paths) > 1:
+ log.error(f"Multiple {_RAT_EXCLUDES_FILENAME} files found:
{exclude_file_paths}")
return {
"valid": False,
- "message": "No root directory found in archive",
- "errors": [],
+ "message": f"Multiple {_RAT_EXCLUDES_FILENAME} files not
allowed (found {len(exclude_file_paths)})",
+ "total_files": 0,
+ "approved_licenses": 0,
+ "unapproved_licenses": 0,
+ "unknown_licenses": 0,
+ "unapproved_files": [],
+ "unknown_license_files": [],
+ "errors": [f"Found {len(exclude_file_paths)}
{_RAT_EXCLUDES_FILENAME} files"],
}
- log.info(f"Using root directory: {extract_dir}")
+ # Narrow to single path after validation
+ exclude_file_path: str | None = exclude_file_paths[0] if
exclude_file_paths else None
+
+ # Determine scan root
+ if exclude_file_path is not None:
+ scan_root = os.path.dirname(os.path.join(temp_dir,
exclude_file_path))
+
+ # Verify that scan_root is inside temp_dir
+ abs_scan_root = os.path.abspath(scan_root)
+ abs_temp_dir = os.path.abspath(temp_dir)
+ scan_root_is_inside = (abs_scan_root == abs_temp_dir) or
abs_scan_root.startswith(abs_temp_dir + os.sep)
+ if not scan_root_is_inside:
+ log.error(f"Scan root {scan_root} is outside temp_dir
{temp_dir}")
+ return {
+ "valid": False,
+ "message": "Invalid archive structure: exclusion file
path escapes extraction directory",
+ "total_files": 0,
+ "approved_licenses": 0,
+ "unapproved_licenses": 0,
+ "unknown_licenses": 0,
+ "unapproved_files": [],
+ "unknown_license_files": [],
+ "errors": ["Exclusion file path escapes extraction
directory"],
+ }
+
+ log.info(f"Using {_RAT_EXCLUDES_FILENAME} directory as scan
root: {scan_root}")
+
+ untracked_count = _count_files_outside_directory(temp_dir,
scan_root)
+ if untracked_count > 0:
+ log.error(f"Found {untracked_count} file(s) outside
{_RAT_EXCLUDES_FILENAME} directory")
+ return {
+ "valid": False,
+ "message": f"Files exist outside
{_RAT_EXCLUDES_FILENAME} directory ({untracked_count} found)",
+ "total_files": 0,
+ "approved_licenses": 0,
+ "unapproved_licenses": 0,
+ "unknown_licenses": 0,
+ "unapproved_files": [],
+ "unknown_license_files": [],
+ "errors": [f"{untracked_count} file(s) outside
{_RAT_EXCLUDES_FILENAME} directory"],
+ }
+ else:
+ scan_root = temp_dir
+ log.info(f"No {_RAT_EXCLUDES_FILENAME} found, using temp_dir
as scan root: {scan_root}")
# Execute RAT and get results or error
error_result, xml_output_path = _check_core_logic_execute_rat(
- rat_jar_path, extract_dir, temp_dir, extracted_paths
+ rat_jar_path, scan_root, temp_dir, exclude_file_path
)
if error_result:
return error_result
@@ -201,18 +323,24 @@ def _check_core_logic(
if xml_output_path is None:
raise ValueError("XML output path is None")
- results = _check_core_logic_parse_output(xml_output_path,
extract_dir)
+ results = _check_core_logic_parse_output(xml_output_path,
scan_root)
log.info(f"Successfully parsed RAT output with
{util.plural(results.get('total_files', 0), 'file')}")
- # The unknown_license_files key may contain a list of dicts
+ # The unknown_license_files and unapproved_files keys contain
lists of dicts
# {"name": "./README.md", "license": "Unknown license"}
- # The path is missing the root of the archive, so we add it
- extract_dir_basename = os.path.basename(extract_dir)
- for file in results["unknown_license_files"]:
- file["name"] = os.path.join(
- extract_dir_basename,
- os.path.normpath(file["name"]),
- )
+ # The path is relative to scan_root, so we prepend the scan_root
relative path
+ scan_root_rel = os.path.relpath(scan_root, temp_dir)
+ if scan_root_rel != ".":
+ for file in results["unknown_license_files"]:
+ file["name"] = os.path.join(
+ scan_root_rel,
+ os.path.normpath(file["name"]),
+ )
+ for file in results["unapproved_files"]:
+ file["name"] = os.path.join(
+ scan_root_rel,
+ os.path.normpath(file["name"]),
+ )
return results
@@ -234,42 +362,37 @@ def _check_core_logic(
def _check_core_logic_execute_rat(
- rat_jar_path: str, extract_dir: str, temp_dir: str, excluded_paths:
list[str]
+ rat_jar_path: str, scan_root: str, temp_dir: str, exclude_file_path: str |
None
) -> tuple[dict[str, Any] | None, str | None]:
"""Execute Apache RAT and process its output."""
- # Define output file path
- xml_output_path = os.path.join(temp_dir, "rat-report.xml")
+ xml_output_path = os.path.join(temp_dir, _RAT_REPORT_FILENAME)
log.info(f"XML output will be written to: {xml_output_path}")
- # Run Apache RAT on the extracted directory
- # TODO: From RAT 0.17, --exclude will become --input-exclude
- # TODO: Check whether --exclude NAME works on inner files
- # (Note that we presently use _rat_apply_exclusions to apply exclusions
instead)
- command = [
- "java",
- *_JAVA_MEMORY_ARGS,
- "-jar",
- rat_jar_path,
- "--output-style",
- "xml",
- "--output-file",
- xml_output_path,
- "--counter-max",
- "UNAPPROVED:-1",
- "--counter-min",
- "LICENSE_CATEGORIES:0",
- "LICENSE_NAMES:0",
- "STANDARDS:0",
- "--",
- ".",
- ]
- if excluded_paths:
- _rat_apply_exclusions(extract_dir, excluded_paths, temp_dir)
+ # Convert exclusion file path from temp_dir relative to scan_root relative
+ exclude_file: str | None = None
+ if exclude_file_path is not None:
+ abs_path = os.path.join(temp_dir, exclude_file_path)
+ if not (os.path.exists(abs_path) and os.path.isfile(abs_path)):
+ log.error(f"Exclusion file not found or not a regular file:
{abs_path}")
+ return {
+ "valid": False,
+ "message": f"Exclusion file is not a regular file:
{exclude_file_path}",
+ "total_files": 0,
+ "approved_licenses": 0,
+ "unapproved_licenses": 0,
+ "unknown_licenses": 0,
+ "unapproved_files": [],
+ "unknown_license_files": [],
+ "errors": [f"Expected exclusion file but found: {abs_path}"],
+ }, None
+ exclude_file = os.path.relpath(abs_path, scan_root)
+ log.info(f"Using exclusion file: {exclude_file}")
+ command = _build_rat_command(rat_jar_path, xml_output_path, exclude_file)
log.info(f"Running Apache RAT: {' '.join(command)}")
- # Change working directory to extract_dir when running the process
+ # Change working directory to scan_root when running the process
current_dir = os.getcwd()
- os.chdir(extract_dir)
+ os.chdir(scan_root)
log.info(f"Executing Apache RAT from directory: {os.getcwd()}")
@@ -549,53 +672,36 @@ def _check_java_installed() -> dict[str, Any] | None:
}
-def _extracted_dir(temp_dir: str) -> str | None:
- # Loop through all the dirs in temp_dir
- extract_dir = None
- log.info(f"Checking directories in {temp_dir}: {os.listdir(temp_dir)}")
- for dir_name in os.listdir(temp_dir):
- if dir_name.startswith("."):
- continue
- dir_path = os.path.join(temp_dir, dir_name)
- if not os.path.isdir(dir_path):
- raise ValueError(f"Unknown file type found in temporary directory:
{dir_path}")
- if extract_dir is None:
- extract_dir = dir_path
- else:
- raise ValueError(f"Multiple root directories found: {extract_dir},
{dir_path}")
- return extract_dir
-
-
-def _rat_apply_exclusions(extract_dir: str, excluded_paths: list[str],
temp_dir: str) -> None:
- """Apply exclusions to the extracted directory."""
- # Exclusions are difficult using the command line version of RAT
- # Each line is interpreted as a literal AND a glob AND a regex
- # Then, if ANY of those three match a filename, the file is excluded
- # You cannot specify which syntax to use; all three are always tried
- # You cannot specify that you want to match against the whole path
- # Therefore, we take a different approach
- # We interpret the exclusion file as a glob file in .gitignore format
- # Then, we simply remove any files that match the glob
- exclusion_lines = []
- for excluded_path in excluded_paths:
- abs_excluded_path = os.path.join(temp_dir, excluded_path)
- if not os.path.exists(abs_excluded_path):
- log.error(f"Exclusion file not found: {abs_excluded_path}")
- continue
- if not os.path.isfile(abs_excluded_path):
- log.error(f"Exclusion file is not a file: {abs_excluded_path}")
+def _count_files_outside_directory(temp_dir: str, scan_root: str) -> int:
+ """Count regular files that exist outside the scan_root directory."""
+ count = 0
+ scan_root_rel = os.path.relpath(scan_root, temp_dir)
+ if scan_root_rel == ".":
+ scan_root_rel = ""
+
+ for root, _dirs, files in os.walk(temp_dir):
+ rel_root = os.path.relpath(root, temp_dir)
+ if rel_root == ".":
+ rel_root = ""
+
+ if _is_inside_directory(rel_root, scan_root_rel):
continue
- with open(abs_excluded_path, encoding="utf-8") as f:
- exclusion_lines.extend(f.readlines())
- matcher = util.create_path_matcher(
- exclusion_lines, pathlib.Path(extract_dir) / ".ignore",
pathlib.Path(extract_dir)
- )
- for root, _dirs, files in os.walk(extract_dir):
- for file in files:
- abs_path = os.path.join(root, file)
- if matcher(abs_path):
- log.info(f"Removing {abs_path} because it matches the
exclusion")
- os.remove(abs_path)
+
+ # for filename in files:
+ # if not filename.startswith("."):
+ # count += 1
+ count += len(files)
+
+ return count
+
+
+def _is_inside_directory(path: str, directory: str) -> bool:
+ """Check whether path is inside directory, or is the directory itself."""
+ if directory == "":
+ return True
+ if path == directory:
+ return True
+ return path.startswith(directory + os.sep)
def _summary_message(valid: bool, unapproved_licenses: int, unknown_licenses:
int) -> str:
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]