This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git


The following commit(s) were added to refs/heads/main by this push:
     new d8bc263  Extract hard and soft link member types in archive files
d8bc263 is described below

commit d8bc263cdc1ea431cef09b33d62d27e59f42193f
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Jun 23 16:16:35 2025 +0100

    Extract hard and soft link member types in archive files
---
 atr/tasks/checks/rat.py |   3 +-
 atr/tasks/sbom.py       | 140 +++++++++++++++++++++++++++++++++++++-----------
 2 files changed, 110 insertions(+), 33 deletions(-)

diff --git a/atr/tasks/checks/rat.py b/atr/tasks/checks/rat.py
index 169fcb0..ec6d125 100644
--- a/atr/tasks/checks/rat.py
+++ b/atr/tasks/checks/rat.py
@@ -93,6 +93,7 @@ def _check_core_logic(
     _LOGGER.info(f"PATH environment variable: {os.environ.get('PATH', 'PATH 
not found')}")
 
     # Check that Java is installed
+    # TODO: Run this only once, when the server starts
     try:
         java_version = subprocess.check_output(
             ["java", *_JAVA_MEMORY_ARGS, "-version"], 
stderr=subprocess.STDOUT, text=True
@@ -116,7 +117,7 @@ def _check_core_logic(
 
             # Try to find where Java might be located
             which_java = subprocess.run(["which", "java"], 
capture_output=True, text=True, check=False)
-            which_java_result = which_java.stdout.strip() if 
which_java.returncode == 0 else "not found"
+            which_java_result = which_java.stdout.strip() if 
(which_java.returncode == 0) else "not found"
             _LOGGER.info(f"Result for which java: {which_java_result}")
         except Exception as inner_e:
             _LOGGER.error(f"Additional error while trying to debug java: 
{inner_e}")
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index 530db6c..c4967f5 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -61,38 +61,11 @@ def archive_extract_safe(
     try:
         with tarfile.open(archive_path, mode="r|gz") as tf:
             for member in tf:
-                if member.name and member.name.split("/")[-1].startswith("._"):
-                    # Metadata convention
-                    continue
-
-                # Skip anything that's not a file or directory
-                if not (member.isreg() or member.isdir()):
-                    continue
-
-                # Check whether extraction would exceed the size limit
-                if member.isreg() and ((total_extracted + member.size) > 
max_size):
-                    raise SBOMGenerationError(
-                        f"Extraction would exceed maximum size limit of 
{max_size} bytes",
-                        {"max_size": max_size, "current_size": 
total_extracted, "file_size": member.size},
-                    )
-
-                # Extract directories directly
-                if member.isdir():
-                    # Ensure the path is safe before extracting
-                    target_path = os.path.join(extract_dir, member.name)
-                    if not 
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
-                        _LOGGER.warning(f"Skipping potentially unsafe path: 
{member.name}")
-                        continue
-                    tf.extract(member, extract_dir, numeric_owner=True)
-                    continue
-
-                if member.isreg():
-                    extracted_size = _archive_extract_safe_process_file(
-                        tf, member, extract_dir, total_extracted, max_size, 
chunk_size
-                    )
-                    total_extracted += extracted_size
-
-                # TODO: Add other types here
+                keep_going, total_extracted = archive_extract_member(
+                    tf, member, extract_dir, total_extracted, max_size, 
chunk_size
+                )
+                if not keep_going:
+                    break
 
     except tarfile.ReadError as e:
         raise SBOMGenerationError(f"Failed to read archive: {e}", 
{"archive_path": archive_path}) from e
@@ -159,6 +132,101 @@ def _archive_extract_safe_process_file(
     return extracted_file_size
 
 
+def archive_extract_member(
+    tf: tarfile.TarFile, member: tarfile.TarInfo, extract_dir: str, 
total_extracted: int, max_size: int, chunk_size: int
+) -> tuple[bool, int]:
+    if member.name and member.name.split("/")[-1].startswith("._"):
+        # Metadata convention
+        return False, 0
+
+    # Skip any character device, block device, or FIFO
+    if member.isdev():
+        return False, 0
+
+    # Check whether extraction would exceed the size limit
+    if member.isreg() and ((total_extracted + member.size) > max_size):
+        raise SBOMGenerationError(
+            f"Extraction would exceed maximum size limit of {max_size} bytes",
+            {"max_size": max_size, "current_size": total_extracted, 
"file_size": member.size},
+        )
+
+    # Extract directories directly
+    if member.isdir():
+        # Ensure the path is safe before extracting
+        target_path = os.path.join(extract_dir, member.name)
+        if not 
os.path.abspath(target_path).startswith(os.path.abspath(extract_dir)):
+            _LOGGER.warning(f"Skipping potentially unsafe path: {member.name}")
+            return False, 0
+        tf.extract(member, extract_dir, numeric_owner=True)
+
+    elif member.isreg():
+        extracted_size = _archive_extract_safe_process_file(
+            tf, member, extract_dir, total_extracted, max_size, chunk_size
+        )
+        total_extracted += extracted_size
+
+    elif member.issym():
+        _archive_extract_safe_process_symlink(member, extract_dir)
+
+    elif member.islnk():
+        _archive_extract_safe_process_hardlink(member, extract_dir)
+
+    return True, total_extracted
+
+
+def _archive_extract_safe_process_hardlink(member: tarfile.TarInfo, 
extract_dir: str) -> None:
+    """Safely create a hard link from the TarInfo entry."""
+    target_path = _safe_path(extract_dir, member.name)
+    if target_path is None:
+        _LOGGER.warning(f"Skipping potentially unsafe hard link path: 
{member.name}")
+        return
+
+    link_target = member.linkname or ""
+    source_path = _safe_path(extract_dir, link_target)
+    if source_path is None or not os.path.exists(source_path):
+        _LOGGER.warning(f"Skipping hard link with invalid target: 
{member.name} -> {link_target}")
+        return
+
+    os.makedirs(os.path.dirname(target_path), exist_ok=True)
+
+    try:
+        if os.path.lexists(target_path):
+            return
+        os.link(source_path, target_path)
+    except (OSError, NotImplementedError) as e:
+        _LOGGER.warning(f"Failed to create hard link {target_path} -> 
{source_path}: {e}")
+
+
+def _archive_extract_safe_process_symlink(member: tarfile.TarInfo, 
extract_dir: str) -> None:
+    """Safely create a symbolic link from the TarInfo entry."""
+    target_path = _safe_path(extract_dir, member.name)
+    if target_path is None:
+        _LOGGER.warning(f"Skipping potentially unsafe symlink path: 
{member.name}")
+        return
+
+    link_target = member.linkname or ""
+
+    # Reject absolute targets to avoid links outside the tree
+    if os.path.isabs(link_target):
+        _LOGGER.warning(f"Skipping symlink with absolute target: {member.name} 
-> {link_target}")
+        return
+
+    # Ensure that the resolved link target stays within the extraction 
directory
+    resolved_target = _safe_path(os.path.dirname(target_path), link_target)
+    if resolved_target is None:
+        _LOGGER.warning(f"Skipping symlink pointing outside tree: 
{member.name} -> {link_target}")
+        return
+
+    os.makedirs(os.path.dirname(target_path), exist_ok=True)
+
+    try:
+        if os.path.lexists(target_path):
+            return
+        os.symlink(link_target, target_path)
+    except (OSError, NotImplementedError) as e:
+        _LOGGER.warning("Failed to create symlink %s -> %s: %s", target_path, 
link_target, e)
+
+
 async def _generate_cyclonedx_core(artifact_path: str, output_path: str) -> 
dict[str, Any]:
     """Core logic to generate CycloneDX SBOM, raising SBOMGenerationError on 
failure."""
     _LOGGER.info(f"Generating CycloneDX SBOM for {artifact_path} -> 
{output_path}")
@@ -247,3 +315,11 @@ async def _generate_cyclonedx_core(artifact_path: str, 
output_path: str) -> dict
         except FileNotFoundError:
             _LOGGER.error("syft command not found. Is it installed and in 
PATH?")
             raise SBOMGenerationError("syft command not found")
+
+
+def _safe_path(base_dir: str, *paths: str) -> str | None:
+    """Return an absolute path within the base_dir built from the given paths, 
or None if it escapes."""
+    target = os.path.abspath(os.path.join(base_dir, *paths))
+    if target.startswith(os.path.abspath(base_dir)):
+        return target
+    return None


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to