This is an automated email from the ASF dual-hosted git repository. sbp pushed a commit to branch sbp in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit b96895d69940414e6b9a9eca570b0c7b3783ede8 Author: Sean B. Palmer <[email protected]> AuthorDate: Tue Feb 24 15:51:50 2026 +0000 Add some simple archive checks for quarantined file validation --- atr/detection.py | 50 ++++++++++++ tests/unit/test_detection.py | 181 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 231 insertions(+) diff --git a/atr/detection.py b/atr/detection.py index 3f17195c..07ecf22d 100644 --- a/atr/detection.py +++ b/atr/detection.py @@ -16,11 +16,14 @@ # under the License. import pathlib +import tarfile +import zipfile from typing import Final import puremagic import atr.models.attestable as models +import atr.tarzip as tarzip _BZIP2_TYPES: Final[set[str]] = {"application/x-bzip2"} _DEB_TYPES: Final[set[str]] = {"application/vnd.debian.binary-package", "application/x-archive"} @@ -61,6 +64,25 @@ _QUARANTINE_ARCHIVE_SUFFIXES: Final[tuple[str, ...]] = (".tar.gz", ".tgz", ".zip _QUARANTINE_NORMALISED_SUFFIXES: Final[dict[str, str]] = {".tgz": ".tar.gz"} +def check_archive_safety(archive_path: str) -> list[str]: + errors: list[str] = [] + try: + with tarzip.open_archive(archive_path) as archive: + for member in archive: + if _archive_member_has_path_traversal(member.name): + errors.append(f"{member.name}: Archive member path traversal is not allowed") + + if (member.issym() or member.islnk()) and _archive_link_escapes_root( + member.name, member.linkname, is_hardlink=member.islnk() + ): + link_target = member.linkname or "" + errors.append(f"{member.name}: Archive link target escapes root ({link_target})") + except (tarfile.TarError, zipfile.BadZipFile, ValueError, tarzip.ArchiveMemberLimitExceededError) as e: + errors.append(f"Failed to read archive: {e}") + + return errors + + def detect_archives_requiring_quarantine( path_to_hash: dict[str, str], previous_attestable: models.AttestableV1 | None ) -> list[str]: @@ -103,6 +125,34 @@ def validate_directory(directory: pathlib.Path) -> list[str]: return errors +def _archive_link_escapes_root(member_name: str, link_target: str | None, *, is_hardlink: bool = False) -> bool: + if link_target is None: + return False + if link_target.startswith("/"): + return True + + link_parts = pathlib.PurePosixPath(link_target).parts + base_parts = () if is_hardlink else pathlib.PurePosixPath(member_name).parent.parts + depth = 0 + for part in (*base_parts, *link_parts): + if part in ("", ".", "/"): + continue + if part == "..": + if depth == 0: + return True + depth -= 1 + else: + depth += 1 + return False + + +def _archive_member_has_path_traversal(path_key: str) -> bool: + if path_key.startswith("/"): + return True + + return ".." in pathlib.PurePosixPath(path_key).parts + + def _path_basename(path_key: str) -> str: return path_key.rsplit("/", maxsplit=1)[-1] diff --git a/tests/unit/test_detection.py b/tests/unit/test_detection.py index 7a89ee46..289c1b8c 100644 --- a/tests/unit/test_detection.py +++ b/tests/unit/test_detection.py @@ -15,9 +15,140 @@ # specific language governing permissions and limitations # under the License. +import io +import pathlib +import tarfile +import zipfile + import atr.detection as detection import atr.models.attestable as models +type TarArchiveEntry = tuple[str, str, bytes | str] + + +def test_check_archive_safety_accepts_safe_tar_gz(tmp_path): + archive_path = tmp_path / "safe.tar.gz" + _write_tar_gz( + archive_path, + [ + _tar_regular_file("dist/apache-widget-1.0-src.tar.gz", b"payload"), + _tar_regular_file("docs/readme.txt", b"hello"), + ], + ) + + assert detection.check_archive_safety(str(archive_path)) == [] + + +def test_check_archive_safety_accepts_safe_zip(tmp_path): + archive_path = tmp_path / "safe.zip" + _write_zip( + archive_path, + [ + ("dist/apache-widget-1.0-src.zip", b"payload"), + ("docs/readme.txt", b"hello"), + ], + ) + + assert detection.check_archive_safety(str(archive_path)) == [] + + +def test_check_archive_safety_rejects_absolute_paths_in_tar_and_zip(tmp_path): + tar_path = tmp_path / "unsafe-absolute.tar.gz" + _write_tar_gz( + tar_path, + [ + _tar_regular_file("/absolute.txt", b"x"), + ], + ) + zip_path = tmp_path / "unsafe-absolute.zip" + _write_zip( + zip_path, + [ + ("/absolute.txt", b"x"), + ], + ) + + tar_errors = detection.check_archive_safety(str(tar_path)) + zip_errors = detection.check_archive_safety(str(zip_path)) + + assert any("/absolute.txt" in error for error in tar_errors) + assert any("path traversal" in error for error in tar_errors) + assert any("/absolute.txt" in error for error in zip_errors) + assert any("path traversal" in error for error in zip_errors) + + +def test_check_archive_safety_rejects_hardlink_target_outside_root(tmp_path): + archive_path = tmp_path / "unsafe-hardlink.tar.gz" + _write_tar_gz( + archive_path, + [ + _tar_regular_file("dist/file.txt", b"ok"), + _tar_hardlink("dist/hard", "../../outside.txt"), + ], + ) + + errors = detection.check_archive_safety(str(archive_path)) + + assert any("dist/hard" in error for error in errors) + assert any("escapes root" in error for error in errors) + + +def test_check_archive_safety_rejects_hardlink_target_resolved_from_root(tmp_path): + archive_path = tmp_path / "unsafe-hardlink-depth.tar.gz" + _write_tar_gz( + archive_path, + [ + _tar_regular_file("a/b/file.txt", b"ok"), + _tar_hardlink("a/b/link", "../secret"), + ], + ) + + errors = detection.check_archive_safety(str(archive_path)) + + assert any("a/b/link" in error for error in errors) + assert any("escapes root" in error for error in errors) + + +def test_check_archive_safety_rejects_parent_path_traversal_in_tar_and_zip(tmp_path): + tar_path = tmp_path / "unsafe-parent.tar.gz" + _write_tar_gz( + tar_path, + [ + _tar_regular_file("../outside.txt", b"x"), + ], + ) + zip_path = tmp_path / "unsafe-parent.zip" + _write_zip( + zip_path, + [ + ("../outside.txt", b"x"), + ], + ) + + tar_errors = detection.check_archive_safety(str(tar_path)) + zip_errors = detection.check_archive_safety(str(zip_path)) + + assert any("../outside.txt" in error for error in tar_errors) + assert any("path traversal" in error for error in tar_errors) + assert any("../outside.txt" in error for error in zip_errors) + assert any("path traversal" in error for error in zip_errors) + + +def test_check_archive_safety_rejects_symlink_target_outside_root(tmp_path): + archive_path = tmp_path / "unsafe-symlink.tar.gz" + _write_tar_gz( + archive_path, + [ + _tar_regular_file("dist/file.txt", b"ok"), + _tar_symlink("dist/link", "../../outside.txt"), + ], + ) + + errors = detection.check_archive_safety(str(archive_path)) + + assert any("dist/link" in error for error in errors) + assert any("escapes root" in error for error in errors) + def test_detect_archives_requiring_quarantine_known_hash_and_different_extension(): previous = models.AttestableV1( @@ -126,3 +257,53 @@ def test_detect_archives_requiring_quarantine_tgz_and_tar_gz_are_equivalent(): ) assert result == [] + + +def _tar_hardlink(name: str, link_target: str) -> TarArchiveEntry: + return ("hardlink", name, link_target) + + +def _tar_regular_file(name: str, data: bytes) -> TarArchiveEntry: + return ("file", name, data) + + +def _tar_symlink(name: str, link_target: str) -> TarArchiveEntry: + return ("symlink", name, link_target) + + +def _write_tar_gz(archive_path: pathlib.Path, members: list[TarArchiveEntry]) -> None: + with tarfile.open(archive_path, "w:gz") as archive: + for member_type, member_name, member_data in members: + if member_type == "file": + if not isinstance(member_data, bytes): + raise ValueError("Tar regular file data must be bytes") + info = tarfile.TarInfo(member_name) + info.size = len(member_data) + archive.addfile(info, io.BytesIO(member_data)) + continue + + if member_type == "symlink": + if not isinstance(member_data, str): + raise ValueError("Tar symlink data must be a path string") + info = tarfile.TarInfo(member_name) + info.type = tarfile.SYMTYPE + info.linkname = member_data + archive.addfile(info) + continue + + if member_type == "hardlink": + if not isinstance(member_data, str): + raise ValueError("Tar hardlink data must be a path string") + info = tarfile.TarInfo(member_name) + info.type = tarfile.LNKTYPE + info.linkname = member_data + archive.addfile(info) + continue + + raise ValueError(f"Unsupported tar member type: {member_type}") + + +def _write_zip(archive_path: pathlib.Path, members: list[tuple[str, bytes]]) -> None: + with zipfile.ZipFile(archive_path, "w") as archive: + for member_name, member_data in members: + archive.writestr(member_name, member_data) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
