This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit b96895d69940414e6b9a9eca570b0c7b3783ede8
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Feb 24 15:51:50 2026 +0000

    Add some simple archive checks for quarantined file validation
---
 atr/detection.py             |  50 ++++++++++++
 tests/unit/test_detection.py | 181 +++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 231 insertions(+)

diff --git a/atr/detection.py b/atr/detection.py
index 3f17195c..07ecf22d 100644
--- a/atr/detection.py
+++ b/atr/detection.py
@@ -16,11 +16,14 @@
 # under the License.
 
 import pathlib
+import tarfile
+import zipfile
 from typing import Final
 
 import puremagic
 
 import atr.models.attestable as models
+import atr.tarzip as tarzip
 
 _BZIP2_TYPES: Final[set[str]] = {"application/x-bzip2"}
 _DEB_TYPES: Final[set[str]] = {"application/vnd.debian.binary-package", 
"application/x-archive"}
@@ -61,6 +64,25 @@ _QUARANTINE_ARCHIVE_SUFFIXES: Final[tuple[str, ...]] = 
(".tar.gz", ".tgz", ".zip
 _QUARANTINE_NORMALISED_SUFFIXES: Final[dict[str, str]] = {".tgz": ".tar.gz"}
 
 
+def check_archive_safety(archive_path: str) -> list[str]:
+    errors: list[str] = []
+    try:
+        with tarzip.open_archive(archive_path) as archive:
+            for member in archive:
+                if _archive_member_has_path_traversal(member.name):
+                    errors.append(f"{member.name}: Archive member path 
traversal is not allowed")
+
+                if (member.issym() or member.islnk()) and 
_archive_link_escapes_root(
+                    member.name, member.linkname, is_hardlink=member.islnk()
+                ):
+                    link_target = member.linkname or ""
+                    errors.append(f"{member.name}: Archive link target escapes 
root ({link_target})")
+    except (tarfile.TarError, zipfile.BadZipFile, ValueError, 
tarzip.ArchiveMemberLimitExceededError) as e:
+        errors.append(f"Failed to read archive: {e}")
+
+    return errors
+
+
 def detect_archives_requiring_quarantine(
     path_to_hash: dict[str, str], previous_attestable: models.AttestableV1 | 
None
 ) -> list[str]:
@@ -103,6 +125,34 @@ def validate_directory(directory: pathlib.Path) -> 
list[str]:
     return errors
 
 
+def _archive_link_escapes_root(member_name: str, link_target: str | None, *, 
is_hardlink: bool = False) -> bool:
+    if link_target is None:
+        return False
+    if link_target.startswith("/"):
+        return True
+
+    link_parts = pathlib.PurePosixPath(link_target).parts
+    base_parts = () if is_hardlink else 
pathlib.PurePosixPath(member_name).parent.parts
+    depth = 0
+    for part in (*base_parts, *link_parts):
+        if part in ("", ".", "/"):
+            continue
+        if part == "..":
+            if depth == 0:
+                return True
+            depth -= 1
+        else:
+            depth += 1
+    return False
+
+
+def _archive_member_has_path_traversal(path_key: str) -> bool:
+    if path_key.startswith("/"):
+        return True
+
+    return ".." in pathlib.PurePosixPath(path_key).parts
+
+
 def _path_basename(path_key: str) -> str:
     return path_key.rsplit("/", maxsplit=1)[-1]
 
diff --git a/tests/unit/test_detection.py b/tests/unit/test_detection.py
index 7a89ee46..289c1b8c 100644
--- a/tests/unit/test_detection.py
+++ b/tests/unit/test_detection.py
@@ -15,9 +15,140 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import io
+import pathlib
+import tarfile
+import zipfile
+
 import atr.detection as detection
 import atr.models.attestable as models
 
+type TarArchiveEntry = tuple[str, str, bytes | str]
+
+
+def test_check_archive_safety_accepts_safe_tar_gz(tmp_path):
+    archive_path = tmp_path / "safe.tar.gz"
+    _write_tar_gz(
+        archive_path,
+        [
+            _tar_regular_file("dist/apache-widget-1.0-src.tar.gz", b"payload"),
+            _tar_regular_file("docs/readme.txt", b"hello"),
+        ],
+    )
+
+    assert detection.check_archive_safety(str(archive_path)) == []
+
+
+def test_check_archive_safety_accepts_safe_zip(tmp_path):
+    archive_path = tmp_path / "safe.zip"
+    _write_zip(
+        archive_path,
+        [
+            ("dist/apache-widget-1.0-src.zip", b"payload"),
+            ("docs/readme.txt", b"hello"),
+        ],
+    )
+
+    assert detection.check_archive_safety(str(archive_path)) == []
+
+
+def test_check_archive_safety_rejects_absolute_paths_in_tar_and_zip(tmp_path):
+    tar_path = tmp_path / "unsafe-absolute.tar.gz"
+    _write_tar_gz(
+        tar_path,
+        [
+            _tar_regular_file("/absolute.txt", b"x"),
+        ],
+    )
+    zip_path = tmp_path / "unsafe-absolute.zip"
+    _write_zip(
+        zip_path,
+        [
+            ("/absolute.txt", b"x"),
+        ],
+    )
+
+    tar_errors = detection.check_archive_safety(str(tar_path))
+    zip_errors = detection.check_archive_safety(str(zip_path))
+
+    assert any("/absolute.txt" in error for error in tar_errors)
+    assert any("path traversal" in error for error in tar_errors)
+    assert any("/absolute.txt" in error for error in zip_errors)
+    assert any("path traversal" in error for error in zip_errors)
+
+
+def test_check_archive_safety_rejects_hardlink_target_outside_root(tmp_path):
+    archive_path = tmp_path / "unsafe-hardlink.tar.gz"
+    _write_tar_gz(
+        archive_path,
+        [
+            _tar_regular_file("dist/file.txt", b"ok"),
+            _tar_hardlink("dist/hard", "../../outside.txt"),
+        ],
+    )
+
+    errors = detection.check_archive_safety(str(archive_path))
+
+    assert any("dist/hard" in error for error in errors)
+    assert any("escapes root" in error for error in errors)
+
+
+def 
test_check_archive_safety_rejects_hardlink_target_resolved_from_root(tmp_path):
+    archive_path = tmp_path / "unsafe-hardlink-depth.tar.gz"
+    _write_tar_gz(
+        archive_path,
+        [
+            _tar_regular_file("a/b/file.txt", b"ok"),
+            _tar_hardlink("a/b/link", "../secret"),
+        ],
+    )
+
+    errors = detection.check_archive_safety(str(archive_path))
+
+    assert any("a/b/link" in error for error in errors)
+    assert any("escapes root" in error for error in errors)
+
+
+def 
test_check_archive_safety_rejects_parent_path_traversal_in_tar_and_zip(tmp_path):
+    tar_path = tmp_path / "unsafe-parent.tar.gz"
+    _write_tar_gz(
+        tar_path,
+        [
+            _tar_regular_file("../outside.txt", b"x"),
+        ],
+    )
+    zip_path = tmp_path / "unsafe-parent.zip"
+    _write_zip(
+        zip_path,
+        [
+            ("../outside.txt", b"x"),
+        ],
+    )
+
+    tar_errors = detection.check_archive_safety(str(tar_path))
+    zip_errors = detection.check_archive_safety(str(zip_path))
+
+    assert any("../outside.txt" in error for error in tar_errors)
+    assert any("path traversal" in error for error in tar_errors)
+    assert any("../outside.txt" in error for error in zip_errors)
+    assert any("path traversal" in error for error in zip_errors)
+
+
+def test_check_archive_safety_rejects_symlink_target_outside_root(tmp_path):
+    archive_path = tmp_path / "unsafe-symlink.tar.gz"
+    _write_tar_gz(
+        archive_path,
+        [
+            _tar_regular_file("dist/file.txt", b"ok"),
+            _tar_symlink("dist/link", "../../outside.txt"),
+        ],
+    )
+
+    errors = detection.check_archive_safety(str(archive_path))
+
+    assert any("dist/link" in error for error in errors)
+    assert any("escapes root" in error for error in errors)
+
 
 def 
test_detect_archives_requiring_quarantine_known_hash_and_different_extension():
     previous = models.AttestableV1(
@@ -126,3 +257,53 @@ def 
test_detect_archives_requiring_quarantine_tgz_and_tar_gz_are_equivalent():
     )
 
     assert result == []
+
+
+def _tar_hardlink(name: str, link_target: str) -> TarArchiveEntry:
+    return ("hardlink", name, link_target)
+
+
+def _tar_regular_file(name: str, data: bytes) -> TarArchiveEntry:
+    return ("file", name, data)
+
+
+def _tar_symlink(name: str, link_target: str) -> TarArchiveEntry:
+    return ("symlink", name, link_target)
+
+
+def _write_tar_gz(archive_path: pathlib.Path, members: list[TarArchiveEntry]) 
-> None:
+    with tarfile.open(archive_path, "w:gz") as archive:
+        for member_type, member_name, member_data in members:
+            if member_type == "file":
+                if not isinstance(member_data, bytes):
+                    raise ValueError("Tar regular file data must be bytes")
+                info = tarfile.TarInfo(member_name)
+                info.size = len(member_data)
+                archive.addfile(info, io.BytesIO(member_data))
+                continue
+
+            if member_type == "symlink":
+                if not isinstance(member_data, str):
+                    raise ValueError("Tar symlink data must be a path string")
+                info = tarfile.TarInfo(member_name)
+                info.type = tarfile.SYMTYPE
+                info.linkname = member_data
+                archive.addfile(info)
+                continue
+
+            if member_type == "hardlink":
+                if not isinstance(member_data, str):
+                    raise ValueError("Tar hardlink data must be a path string")
+                info = tarfile.TarInfo(member_name)
+                info.type = tarfile.LNKTYPE
+                info.linkname = member_data
+                archive.addfile(info)
+                continue
+
+            raise ValueError(f"Unsupported tar member type: {member_type}")
+
+
+def _write_zip(archive_path: pathlib.Path, members: list[tuple[str, bytes]]) 
-> None:
+    with zipfile.ZipFile(archive_path, "w") as archive:
+        for member_name, member_data in members:
+            archive.writestr(member_name, member_data)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to