This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch sbp
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/sbp by this push:
new bc3f4f19 Detect which files need to be quarantined
bc3f4f19 is described below
commit bc3f4f195d18961a7c8d1a937b7d4fe675dfe629
Author: Sean B. Palmer <[email protected]>
AuthorDate: Tue Feb 24 14:39:57 2026 +0000
Detect which files need to be quarantined
---
atr/attestable.py | 18 +++++-
atr/detection.py | 45 +++++++++++++++
atr/models/attestable.py | 1 +
tests/unit/test_attestable.py | 71 +++++++++++++++++++++++
tests/unit/test_detection.py | 128 ++++++++++++++++++++++++++++++++++++++++++
5 files changed, 261 insertions(+), 2 deletions(-)
diff --git a/atr/attestable.py b/atr/attestable.py
index 91a65470..11de4354 100644
--- a/atr/attestable.py
+++ b/atr/attestable.py
@@ -207,7 +207,7 @@ async def write_files_data(
await
f.write(models.AttestableChecksV2().model_dump_json(indent=2))
-def _compute_hashes_with_attribution(
+def _compute_hashes_with_attribution( # noqa: C901
current_hash_to_paths: dict[str, set[str]],
path_to_size: dict[str, int],
previous: models.AttestableV1 | None,
@@ -228,13 +228,23 @@ def _compute_hashes_with_attribution(
previous_paths = previous_hash_to_paths.get(hash_ref, set())
sample_path = next(iter(current_paths))
file_size = path_to_size[sample_path]
+ current_basenames = {_path_basename(path_key) for path_key in
current_paths}
if hash_ref not in new_hashes:
new_hashes[hash_ref] = models.HashEntry(
size=file_size,
uploaders=[(uploader_uid, revision_number)],
+ basenames=sorted(current_basenames),
)
- elif len(current_paths) > len(previous_paths):
+ continue
+
+ existing_basenames = set(new_hashes[hash_ref].basenames)
+ for basename in sorted(current_basenames):
+ if basename not in existing_basenames:
+ new_hashes[hash_ref].basenames.append(basename)
+ existing_basenames.add(basename)
+
+ if len(current_paths) > len(previous_paths):
existing_entries = set(new_hashes[hash_ref].uploaders)
if (uploader_uid, revision_number) not in existing_entries:
new_hashes[hash_ref].uploaders.append((uploader_uid,
revision_number))
@@ -263,3 +273,7 @@ def _generate_files_data(
hashes=dict(new_hashes),
policy=release_policy or {},
)
+
+
+def _path_basename(path_key: str) -> str:
+ return path_key.rsplit("/", maxsplit=1)[-1]
diff --git a/atr/detection.py b/atr/detection.py
index 0e52ce47..3f17195c 100644
--- a/atr/detection.py
+++ b/atr/detection.py
@@ -20,6 +20,8 @@ from typing import Final
import puremagic
+import atr.models.attestable as models
+
_BZIP2_TYPES: Final[set[str]] = {"application/x-bzip2"}
_DEB_TYPES: Final[set[str]] = {"application/vnd.debian.binary-package",
"application/x-archive"}
_EXE_TYPES: Final[set[str]] =
{"application/vnd.microsoft.portable-executable", "application/octet-stream"}
@@ -55,6 +57,37 @@ _EXPECTED: Final[dict[str, set[str]]] = {
}
_COMPOUND_SUFFIXES: Final = tuple(s for s in _EXPECTED if s.count(".") > 1)
+_QUARANTINE_ARCHIVE_SUFFIXES: Final[tuple[str, ...]] = (".tar.gz", ".tgz",
".zip")
+_QUARANTINE_NORMALISED_SUFFIXES: Final[dict[str, str]] = {".tgz": ".tar.gz"}
+
+
+def detect_archives_requiring_quarantine(
+ path_to_hash: dict[str, str], previous_attestable: models.AttestableV1 |
None
+) -> list[str]:
+ quarantine_paths: list[str] = []
+ for path_key, hash_ref in path_to_hash.items():
+ basename = _path_basename(path_key)
+ suffix = _quarantine_archive_suffix(basename)
+ if suffix is None:
+ continue
+
+ if previous_attestable is None:
+ quarantine_paths.append(path_key)
+ continue
+
+ historical_hash_entry = previous_attestable.hashes.get(hash_ref)
+ if historical_hash_entry is None:
+ quarantine_paths.append(path_key)
+ continue
+
+ if "basenames" not in historical_hash_entry.model_fields_set:
+ quarantine_paths.append(path_key)
+ continue
+
+ if not any(_quarantine_archive_suffix(b) == suffix for b in
historical_hash_entry.basenames):
+ quarantine_paths.append(path_key)
+
+ return quarantine_paths
def validate_directory(directory: pathlib.Path) -> list[str]:
@@ -70,6 +103,18 @@ def validate_directory(directory: pathlib.Path) ->
list[str]:
return errors
+def _path_basename(path_key: str) -> str:
+ return path_key.rsplit("/", maxsplit=1)[-1]
+
+
+def _quarantine_archive_suffix(filename: str) -> str | None:
+ lower_name = filename.lower()
+ for suffix in _QUARANTINE_ARCHIVE_SUFFIXES:
+ if lower_name.endswith(suffix):
+ return _QUARANTINE_NORMALISED_SUFFIXES.get(suffix, suffix)
+ return None
+
+
def _suffix(filename: str) -> str:
name = filename.lower()
for compound in _COMPOUND_SUFFIXES:
diff --git a/atr/models/attestable.py b/atr/models/attestable.py
index 548a77ea..1e1bd7bb 100644
--- a/atr/models/attestable.py
+++ b/atr/models/attestable.py
@@ -25,6 +25,7 @@ from . import schema
class HashEntry(schema.Strict):
size: int
uploaders: list[Annotated[tuple[str, str],
pydantic.BeforeValidator(tuple)]]
+ basenames: list[str] = schema.factory(list)
class AttestableChecksV1(schema.Strict):
diff --git a/tests/unit/test_attestable.py b/tests/unit/test_attestable.py
new file mode 100644
index 00000000..24bd4a6a
--- /dev/null
+++ b/tests/unit/test_attestable.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import atr.attestable as attestable
+import atr.models.attestable as models
+
+
+def test_hash_entry_basenames_round_trip():
+ entry = models.HashEntry(
+ size=123,
+ uploaders=[("alice", "00001")],
+ basenames=["apache-widget-1.0-src.tar.gz"],
+ )
+
+ loaded = models.HashEntry.model_validate_json(entry.model_dump_json())
+
+ assert loaded == entry
+ assert loaded.basenames == ["apache-widget-1.0-src.tar.gz"]
+
+
+def test_hash_metadata_basenames_are_cumulative_and_unique():
+ previous = models.AttestableV1(
+ paths={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+ hashes={
+ "h1": models.HashEntry(
+ size=100,
+ uploaders=[("alice", "00001")],
+ basenames=["apache-widget-1.0-src.tar.gz"],
+ )
+ },
+ policy={},
+ )
+ path_to_hash = {
+ "dist/apache-widget-1.0-src.tar.gz": "h1",
+ "dist/apache-widget-1.0.zip": "h1",
+ "other/apache-widget-1.0.zip": "h1",
+ "docs/readme.txt": "h2",
+ }
+ path_to_size = {
+ "dist/apache-widget-1.0-src.tar.gz": 100,
+ "dist/apache-widget-1.0.zip": 100,
+ "other/apache-widget-1.0.zip": 100,
+ "docs/readme.txt": 50,
+ }
+
+ data = attestable._generate_files_data(
+ path_to_hash=path_to_hash,
+ path_to_size=path_to_size,
+ revision_number="00002",
+ release_policy=None,
+ uploader_uid="bob",
+ previous=previous,
+ )
+
+ assert data.hashes["h1"].basenames == ["apache-widget-1.0-src.tar.gz",
"apache-widget-1.0.zip"]
+ assert data.hashes["h1"].uploaders == [("alice", "00001"), ("bob",
"00002")]
+ assert data.hashes["h2"].basenames == ["readme.txt"]
diff --git a/tests/unit/test_detection.py b/tests/unit/test_detection.py
new file mode 100644
index 00000000..7a89ee46
--- /dev/null
+++ b/tests/unit/test_detection.py
@@ -0,0 +1,128 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import atr.detection as detection
+import atr.models.attestable as models
+
+
+def
test_detect_archives_requiring_quarantine_known_hash_and_different_extension():
+ previous = models.AttestableV1(
+ paths={"dist/apache-widget-1.0-src.tgz": "h1"},
+ hashes={"h1": models.HashEntry(size=100, uploaders=[("alice",
"00001")], basenames=["old-src.tgz"])},
+ policy={},
+ )
+
+ result = detection.detect_archives_requiring_quarantine(
+ path_to_hash={"dist/apache-widget-1.0.zip": "h1"},
+ previous_attestable=previous,
+ )
+
+ assert result == ["dist/apache-widget-1.0.zip"]
+
+
+def test_detect_archives_requiring_quarantine_known_hash_and_same_extension():
+ previous = models.AttestableV1(
+ paths={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+ hashes={
+ "h1": models.HashEntry(
+ size=100,
+ uploaders=[("alice", "00001")],
+ basenames=["apache-widget-0.9-src.tar.gz"],
+ )
+ },
+ policy={},
+ )
+
+ result = detection.detect_archives_requiring_quarantine(
+ path_to_hash={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+ previous_attestable=previous,
+ )
+
+ assert result == []
+
+
+def test_detect_archives_requiring_quarantine_missing_historical_basenames():
+ hash_entry = models.HashEntry(size=100, uploaders=[("alice", "00001")])
+ previous = models.AttestableV1(
+ paths={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+ hashes={"h1": hash_entry},
+ policy={},
+ )
+
+ result = detection.detect_archives_requiring_quarantine(
+ path_to_hash={"dist/apache-widget-1.1-src.tar.gz": "h1"},
+ previous_attestable=previous,
+ )
+
+ assert "basenames" not in hash_entry.model_fields_set
+ assert result == ["dist/apache-widget-1.1-src.tar.gz"]
+
+
+def test_detect_archives_requiring_quarantine_new_hash_new_extension():
+ previous = models.AttestableV1(
+ paths={"dist/apache-widget-1.0-src.tar.gz": "h_old"},
+ hashes={"h_old": models.HashEntry(size=100, uploaders=[("alice",
"00001")], basenames=["old-src.tar.gz"])},
+ policy={},
+ )
+
+ result = detection.detect_archives_requiring_quarantine(
+ path_to_hash={"dist/apache-widget-1.1.zip": "h_new"},
+ previous_attestable=previous,
+ )
+
+ assert result == ["dist/apache-widget-1.1.zip"]
+
+
+def test_detect_archives_requiring_quarantine_no_previous_attestable():
+ result = detection.detect_archives_requiring_quarantine(
+ path_to_hash={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+ previous_attestable=None,
+ )
+
+ assert result == ["dist/apache-widget-1.0-src.tar.gz"]
+
+
+def test_detect_archives_requiring_quarantine_non_archive_files_are_ignored():
+ previous = models.AttestableV1(paths={}, hashes={}, policy={})
+
+ result = detection.detect_archives_requiring_quarantine(
+ path_to_hash={"dist/README.md": "h1", "dist/KEYS": "h2"},
+ previous_attestable=previous,
+ )
+
+ assert result == []
+
+
+def test_detect_archives_requiring_quarantine_tgz_and_tar_gz_are_equivalent():
+ previous = models.AttestableV1(
+ paths={"dist/apache-widget-1.0-src.tar.gz": "h1"},
+ hashes={
+ "h1": models.HashEntry(
+ size=100,
+ uploaders=[("alice", "00001")],
+ basenames=["apache-widget-1.0-src.tar.gz"],
+ )
+ },
+ policy={},
+ )
+
+ result = detection.detect_archives_requiring_quarantine(
+ path_to_hash={"dist/apache-widget-1.0-src.tgz": "h1"},
+ previous_attestable=previous,
+ )
+
+ assert result == []
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]