This is an automated email from the ASF dual-hosted git repository.

sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git


The following commit(s) were added to refs/heads/sbp by this push:
     new bc3f4f19 Detect which files need to be quarantined
bc3f4f19 is described below

commit bc3f4f195d18961a7c8d1a937b7d4fe675dfe629
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Feb 24 14:39:57 2026 +0000

    Detect which files need to be quarantined
---
 atr/attestable.py             |  18 +++++-
 atr/detection.py              |  45 +++++++++++++++
 atr/models/attestable.py      |   1 +
 tests/unit/test_attestable.py |  71 +++++++++++++++++++++++
 tests/unit/test_detection.py  | 128 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 261 insertions(+), 2 deletions(-)

diff --git a/atr/attestable.py b/atr/attestable.py
index 91a65470..11de4354 100644
--- a/atr/attestable.py
+++ b/atr/attestable.py
@@ -207,7 +207,7 @@ async def write_files_data(
             await 
f.write(models.AttestableChecksV2().model_dump_json(indent=2))
 
 
-def _compute_hashes_with_attribution(
+def _compute_hashes_with_attribution(  # noqa: C901
     current_hash_to_paths: dict[str, set[str]],
     path_to_size: dict[str, int],
     previous: models.AttestableV1 | None,
@@ -228,13 +228,23 @@ def _compute_hashes_with_attribution(
         previous_paths = previous_hash_to_paths.get(hash_ref, set())
         sample_path = next(iter(current_paths))
         file_size = path_to_size[sample_path]
+        current_basenames = {_path_basename(path_key) for path_key in 
current_paths}
 
         if hash_ref not in new_hashes:
             new_hashes[hash_ref] = models.HashEntry(
                 size=file_size,
                 uploaders=[(uploader_uid, revision_number)],
+                basenames=sorted(current_basenames),
             )
-        elif len(current_paths) > len(previous_paths):
+            continue
+
+        existing_basenames = set(new_hashes[hash_ref].basenames)
+        for basename in sorted(current_basenames):
+            if basename not in existing_basenames:
+                new_hashes[hash_ref].basenames.append(basename)
+                existing_basenames.add(basename)
+
+        if len(current_paths) > len(previous_paths):
             existing_entries = set(new_hashes[hash_ref].uploaders)
             if (uploader_uid, revision_number) not in existing_entries:
                 new_hashes[hash_ref].uploaders.append((uploader_uid, 
revision_number))
@@ -263,3 +273,7 @@ def _generate_files_data(
         hashes=dict(new_hashes),
         policy=release_policy or {},
     )
+
+
+def _path_basename(path_key: str) -> str:
+    return path_key.rsplit("/", maxsplit=1)[-1]
diff --git a/atr/detection.py b/atr/detection.py
index 0e52ce47..3f17195c 100644
--- a/atr/detection.py
+++ b/atr/detection.py
@@ -20,6 +20,8 @@ from typing import Final
 
 import puremagic
 
+import atr.models.attestable as models
+
 _BZIP2_TYPES: Final[set[str]] = {"application/x-bzip2"}
 _DEB_TYPES: Final[set[str]] = {"application/vnd.debian.binary-package", 
"application/x-archive"}
 _EXE_TYPES: Final[set[str]] = 
{"application/vnd.microsoft.portable-executable", "application/octet-stream"}
@@ -55,6 +57,37 @@ _EXPECTED: Final[dict[str, set[str]]] = {
 }
 
 _COMPOUND_SUFFIXES: Final = tuple(s for s in _EXPECTED if s.count(".") > 1)
+_QUARANTINE_ARCHIVE_SUFFIXES: Final[tuple[str, ...]] = (".tar.gz", ".tgz", 
".zip")
+_QUARANTINE_NORMALISED_SUFFIXES: Final[dict[str, str]] = {".tgz": ".tar.gz"}
+
+
+def detect_archives_requiring_quarantine(
+    path_to_hash: dict[str, str], previous_attestable: models.AttestableV1 | 
None
+) -> list[str]:
+    quarantine_paths: list[str] = []
+    for path_key, hash_ref in path_to_hash.items():
+        basename = _path_basename(path_key)
+        suffix = _quarantine_archive_suffix(basename)
+        if suffix is None:
+            continue
+
+        if previous_attestable is None:
+            quarantine_paths.append(path_key)
+            continue
+
+        historical_hash_entry = previous_attestable.hashes.get(hash_ref)
+        if historical_hash_entry is None:
+            quarantine_paths.append(path_key)
+            continue
+
+        if "basenames" not in historical_hash_entry.model_fields_set:
+            quarantine_paths.append(path_key)
+            continue
+
+        if not any(_quarantine_archive_suffix(b) == suffix for b in 
historical_hash_entry.basenames):
+            quarantine_paths.append(path_key)
+
+    return quarantine_paths
 
 
 def validate_directory(directory: pathlib.Path) -> list[str]:
@@ -70,6 +103,18 @@ def validate_directory(directory: pathlib.Path) -> 
list[str]:
     return errors
 
 
+def _path_basename(path_key: str) -> str:
+    return path_key.rsplit("/", maxsplit=1)[-1]
+
+
+def _quarantine_archive_suffix(filename: str) -> str | None:
+    lower_name = filename.lower()
+    for suffix in _QUARANTINE_ARCHIVE_SUFFIXES:
+        if lower_name.endswith(suffix):
+            return _QUARANTINE_NORMALISED_SUFFIXES.get(suffix, suffix)
+    return None
+
+
 def _suffix(filename: str) -> str:
     name = filename.lower()
     for compound in _COMPOUND_SUFFIXES:
diff --git a/atr/models/attestable.py b/atr/models/attestable.py
index 548a77ea..1e1bd7bb 100644
--- a/atr/models/attestable.py
+++ b/atr/models/attestable.py
@@ -25,6 +25,7 @@ from . import schema
 class HashEntry(schema.Strict):
     size: int
     uploaders: list[Annotated[tuple[str, str], 
pydantic.BeforeValidator(tuple)]]
+    basenames: list[str] = schema.factory(list)
 
 
 class AttestableChecksV1(schema.Strict):
diff --git a/tests/unit/test_attestable.py b/tests/unit/test_attestable.py
new file mode 100644
index 00000000..24bd4a6a
--- /dev/null
+++ b/tests/unit/test_attestable.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import atr.attestable as attestable
+import atr.models.attestable as models
+
+
+def test_hash_entry_basenames_round_trip():
+    entry = models.HashEntry(
+        size=123,
+        uploaders=[("alice", "00001")],
+        basenames=["apache-widget-1.0-src.tar.gz"],
+    )
+
+    loaded = models.HashEntry.model_validate_json(entry.model_dump_json())
+
+    assert loaded == entry
+    assert loaded.basenames == ["apache-widget-1.0-src.tar.gz"]
+
+
+def test_hash_metadata_basenames_are_cumulative_and_unique():
+    previous = models.AttestableV1(
+        paths={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+        hashes={
+            "h1": models.HashEntry(
+                size=100,
+                uploaders=[("alice", "00001")],
+                basenames=["apache-widget-1.0-src.tar.gz"],
+            )
+        },
+        policy={},
+    )
+    path_to_hash = {
+        "dist/apache-widget-1.0-src.tar.gz": "h1",
+        "dist/apache-widget-1.0.zip": "h1",
+        "other/apache-widget-1.0.zip": "h1",
+        "docs/readme.txt": "h2",
+    }
+    path_to_size = {
+        "dist/apache-widget-1.0-src.tar.gz": 100,
+        "dist/apache-widget-1.0.zip": 100,
+        "other/apache-widget-1.0.zip": 100,
+        "docs/readme.txt": 50,
+    }
+
+    data = attestable._generate_files_data(
+        path_to_hash=path_to_hash,
+        path_to_size=path_to_size,
+        revision_number="00002",
+        release_policy=None,
+        uploader_uid="bob",
+        previous=previous,
+    )
+
+    assert data.hashes["h1"].basenames == ["apache-widget-1.0-src.tar.gz", 
"apache-widget-1.0.zip"]
+    assert data.hashes["h1"].uploaders == [("alice", "00001"), ("bob", 
"00002")]
+    assert data.hashes["h2"].basenames == ["readme.txt"]
diff --git a/tests/unit/test_detection.py b/tests/unit/test_detection.py
new file mode 100644
index 00000000..7a89ee46
--- /dev/null
+++ b/tests/unit/test_detection.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import atr.detection as detection
+import atr.models.attestable as models
+
+
+def 
test_detect_archives_requiring_quarantine_known_hash_and_different_extension():
+    previous = models.AttestableV1(
+        paths={"dist/apache-widget-1.0-src.tgz": "h1"},
+        hashes={"h1": models.HashEntry(size=100, uploaders=[("alice", 
"00001")], basenames=["old-src.tgz"])},
+        policy={},
+    )
+
+    result = detection.detect_archives_requiring_quarantine(
+        path_to_hash={"dist/apache-widget-1.0.zip": "h1"},
+        previous_attestable=previous,
+    )
+
+    assert result == ["dist/apache-widget-1.0.zip"]
+
+
+def test_detect_archives_requiring_quarantine_known_hash_and_same_extension():
+    previous = models.AttestableV1(
+        paths={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+        hashes={
+            "h1": models.HashEntry(
+                size=100,
+                uploaders=[("alice", "00001")],
+                basenames=["apache-widget-0.9-src.tar.gz"],
+            )
+        },
+        policy={},
+    )
+
+    result = detection.detect_archives_requiring_quarantine(
+        path_to_hash={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+        previous_attestable=previous,
+    )
+
+    assert result == []
+
+
+def test_detect_archives_requiring_quarantine_missing_historical_basenames():
+    hash_entry = models.HashEntry(size=100, uploaders=[("alice", "00001")])
+    previous = models.AttestableV1(
+        paths={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+        hashes={"h1": hash_entry},
+        policy={},
+    )
+
+    result = detection.detect_archives_requiring_quarantine(
+        path_to_hash={"dist/apache-widget-1.1-src.tar.gz": "h1"},
+        previous_attestable=previous,
+    )
+
+    assert "basenames" not in hash_entry.model_fields_set
+    assert result == ["dist/apache-widget-1.1-src.tar.gz"]
+
+
+def test_detect_archives_requiring_quarantine_new_hash_new_extension():
+    previous = models.AttestableV1(
+        paths={"dist/apache-widget-1.0-src.tar.gz": "h_old"},
+        hashes={"h_old": models.HashEntry(size=100, uploaders=[("alice", 
"00001")], basenames=["old-src.tar.gz"])},
+        policy={},
+    )
+
+    result = detection.detect_archives_requiring_quarantine(
+        path_to_hash={"dist/apache-widget-1.1.zip": "h_new"},
+        previous_attestable=previous,
+    )
+
+    assert result == ["dist/apache-widget-1.1.zip"]
+
+
+def test_detect_archives_requiring_quarantine_no_previous_attestable():
+    result = detection.detect_archives_requiring_quarantine(
+        path_to_hash={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+        previous_attestable=None,
+    )
+
+    assert result == ["dist/apache-widget-1.0-src.tar.gz"]
+
+
+def test_detect_archives_requiring_quarantine_non_archive_files_are_ignored():
+    previous = models.AttestableV1(paths={}, hashes={}, policy={})
+
+    result = detection.detect_archives_requiring_quarantine(
+        path_to_hash={"dist/README.md": "h1", "dist/KEYS": "h2"},
+        previous_attestable=previous,
+    )
+
+    assert result == []
+
+
+def test_detect_archives_requiring_quarantine_tgz_and_tar_gz_are_equivalent():
+    previous = models.AttestableV1(
+        paths={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+        hashes={
+            "h1": models.HashEntry(
+                size=100,
+                uploaders=[("alice", "00001")],
+                basenames=["apache-widget-1.0-src.tar.gz"],
+            )
+        },
+        policy={},
+    )
+
+    result = detection.detect_archives_requiring_quarantine(
+        path_to_hash={"dist/apache-widget-1.0-src.tgz": "h1"},
+        previous_attestable=previous,
+    )
+
+    assert result == []


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to