This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-atr-experiments.git
The following commit(s) were added to refs/heads/main by this push:
new 054d208 Move signature checks into their own tasks module
054d208 is described below
commit 054d20802b311bb2ab0851523e3bf6b6995589d4
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Mar 10 16:26:59 2025 +0200
Move signature checks into their own tasks module
---
atr/tasks/archive.py | 15 ++-----
atr/tasks/signature.py | 115 +++++++++++++++++++++++++++++++++++++++++++++++++
atr/tasks/task.py | 23 ++++++++--
atr/verify.py | 82 +----------------------------------
atr/worker.py | 28 +++++++-----
docs/conventions.md | 4 +-
6 files changed, 158 insertions(+), 109 deletions(-)
diff --git a/atr/tasks/archive.py b/atr/tasks/archive.py
index a05fe1d..ce200ed 100644
--- a/atr/tasks/archive.py
+++ b/atr/tasks/archive.py
@@ -25,7 +25,7 @@ import atr.tasks.task as task
_LOGGER = logging.getLogger(__name__)
-def check_integrity(args: list[str]) -> tuple[task.TaskStatus, str | None,
tuple[Any, ...]]:
+def check_integrity(args: list[str]) -> tuple[task.Status, str | None,
tuple[Any, ...]]:
"""Check the integrity of a .tar.gz file."""
# TODO: We should standardise the "ERROR" mechanism here in the data
# Then we can have a single task wrapper for all tasks
@@ -33,14 +33,14 @@ def check_integrity(args: list[str]) ->
tuple[task.TaskStatus, str | None, tuple
# First argument should be the path, second is optional chunk_size
path = args[0]
chunk_size = int(args[1]) if len(args) > 1 else 4096
- task_results = _wrap_results(_check_integrity_core(path, chunk_size))
+ task_results = task.results_as_tuple(_check_integrity_core(path,
chunk_size))
_LOGGER.info(f"Verified {args} and computed size {task_results[0]}")
return task.COMPLETED, None, task_results
-def check_structure(args: list[str]) -> tuple[task.TaskStatus, str | None,
tuple[Any, ...]]:
+def check_structure(args: list[str]) -> tuple[task.Status, str | None,
tuple[Any, ...]]:
"""Check the structure of a .tar.gz file."""
- task_results = _wrap_results(_check_structure_core(*args))
+ task_results = task.results_as_tuple(_check_structure_core(*args))
_LOGGER.info(f"Verified archive structure for {args}")
status = task.FAILED if not task_results[0]["valid"] else task.COMPLETED
error = task_results[0]["message"] if not task_results[0]["valid"] else
None
@@ -104,10 +104,3 @@ def _check_structure_core(tgz_path: str, filename: str) ->
dict[str, Any]:
}
return {"valid": True, "root_dirs": [root], "message": "Archive structure
is valid"}
-
-
-def _wrap_results(item: Any) -> tuple[Any, ...]:
- """Ensure that returned results are structured as a tuple."""
- if not isinstance(item, tuple):
- return (item,)
- return item
diff --git a/atr/tasks/signature.py b/atr/tasks/signature.py
new file mode 100644
index 0000000..74886fe
--- /dev/null
+++ b/atr/tasks/signature.py
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import logging
+import shutil
+import tempfile
+from collections.abc import Generator
+from typing import Any, BinaryIO, Final, cast
+
+import gnupg
+import sqlalchemy.sql as sql
+
+import atr.db as db
+import atr.db.models as models
+import atr.tasks.task as task
+
+_LOGGER = logging.getLogger(__name__)
+
+
+def check(args: list[str]) -> tuple[task.Status, str | None, tuple[Any, ...]]:
+ """Check a signature file."""
+ task_results = task.results_as_tuple(_check_core(*args))
+ _LOGGER.info(f"Verified {args} with result {task_results[0]}")
+ status = task.FAILED if task_results[0].get("error") else task.COMPLETED
+ error = task_results[0].get("error")
+ return status, error, task_results
+
+
+def _check_core(pmc_name: str, artifact_path: str, signature_path: str) ->
dict[str, Any]:
+ """Verify a signature file using the PMC's public signing keys."""
+ # Query only the signing keys associated with this PMC
+ # TODO: Rename create_sync_db_session to create_session_sync
+ with db.create_sync_db_session() as session:
+ from sqlalchemy.sql.expression import ColumnElement
+
+ statement = (
+ sql.select(models.PublicSigningKey)
+ .join(models.PMCKeyLink)
+ .join(models.PMC)
+ .where(cast(ColumnElement[bool], models.PMC.project_name ==
pmc_name))
+ )
+ result = session.execute(statement)
+ public_keys = [key.ascii_armored_key for key in result.scalars().all()]
+
+ with open(signature_path, "rb") as sig_file:
+ return _signature_gpg_file(sig_file, artifact_path, public_keys)
+
+
[email protected]
+def _ephemeral_gpg_home() -> Generator[str]:
+ # TODO: Deduplicate, and move somewhere more appropriate
+ """Create a temporary directory for an isolated GPG home, and clean it up
on exit."""
+ temp_dir = tempfile.mkdtemp(prefix="gpg-")
+ try:
+ yield temp_dir
+ finally:
+ shutil.rmtree(temp_dir)
+
+
+def _signature_gpg_file(sig_file: BinaryIO, artifact_path: str,
ascii_armored_keys: list[str]) -> dict[str, Any]:
+ """Verify a GPG signature for a file."""
+ with _ephemeral_gpg_home() as gpg_home:
+ gpg: Final[gnupg.GPG] = gnupg.GPG(gnupghome=gpg_home)
+
+ # Import all PMC public signing keys
+ for key in ascii_armored_keys:
+ import_result = gpg.import_keys(key)
+ if not import_result.fingerprints:
+ # TODO: Log warning about invalid key?
+ continue
+ verified = gpg.verify_file(sig_file, str(artifact_path))
+
+ # Collect all available information for debugging
+ debug_info = {
+ "key_id": verified.key_id or "Not available",
+ "fingerprint": verified.fingerprint.lower() if verified.fingerprint
else "Not available",
+ "pubkey_fingerprint": verified.pubkey_fingerprint.lower() if
verified.pubkey_fingerprint else "Not available",
+ "creation_date": verified.creation_date or "Not available",
+ "timestamp": verified.timestamp or "Not available",
+ "username": verified.username or "Not available",
+ "status": verified.status or "Not available",
+ "valid": bool(verified),
+ "trust_level": verified.trust_level if hasattr(verified,
"trust_level") else "Not available",
+ "trust_text": verified.trust_text if hasattr(verified, "trust_text")
else "Not available",
+ "stderr": verified.stderr if hasattr(verified, "stderr") else "Not
available",
+ "num_pmc_keys": len(ascii_armored_keys),
+ }
+
+ if not verified:
+ raise task.Error("No valid signature found", debug_info)
+
+ return {
+ "verified": True,
+ "key_id": verified.key_id,
+ "timestamp": verified.timestamp,
+ "username": verified.username or "Unknown",
+ "email": verified.pubkey_fingerprint.lower() or "Unknown",
+ "status": "Valid signature",
+ "debug_info": debug_info,
+ }
diff --git a/atr/tasks/task.py b/atr/tasks/task.py
index 4a14900..5b71191 100644
--- a/atr/tasks/task.py
+++ b/atr/tasks/task.py
@@ -16,13 +16,28 @@
# under the License.
from enum import Enum
-from typing import Final, Literal
+from typing import Any, Final, Literal
-class TaskStatus(Enum):
+class Status(Enum):
COMPLETED = "completed"
FAILED = "failed"
-COMPLETED: Final[Literal[TaskStatus.COMPLETED]] = TaskStatus.COMPLETED
-FAILED: Final[Literal[TaskStatus.FAILED]] = TaskStatus.FAILED
+COMPLETED: Final[Literal[Status.COMPLETED]] = Status.COMPLETED
+FAILED: Final[Literal[Status.FAILED]] = Status.FAILED
+
+
+class Error(Exception):
+ """Error during task execution."""
+
+ def __init__(self, message: str, *result: Any) -> None:
+ self.message = message
+ self.result = tuple(result)
+
+
+def results_as_tuple(item: Any) -> tuple[Any, ...]:
+ """Ensure that returned results are structured as a tuple."""
+ if not isinstance(item, tuple):
+ return (item,)
+ return item
diff --git a/atr/verify.py b/atr/verify.py
index 635f8c4..a7fdd7f 100644
--- a/atr/verify.py
+++ b/atr/verify.py
@@ -18,21 +18,13 @@
import logging
import os
import re
-import shutil
import subprocess
import tarfile
import tempfile
import xml.etree.ElementTree as ET
-from collections.abc import Generator
-from contextlib import contextmanager
-from typing import Any, BinaryIO, cast
-
-import gnupg
-from sqlalchemy.sql import select
+from typing import Any
from atr.config import get_config
-from atr.db import create_sync_db_session
-from atr.db.models import PMC, PMCKeyLink, PublicSigningKey
_LOGGER = logging.getLogger(__name__)
@@ -194,78 +186,6 @@ def license_files(artifact_path: str) -> dict[str, Any]:
}
-def signature(pmc_name: str, artifact_path: str, signature_path: str) ->
dict[str, Any]:
- """Verify a signature file using the PMC's public signing keys."""
- # Query only the signing keys associated with this PMC
- with create_sync_db_session() as session:
- from sqlalchemy.sql.expression import ColumnElement
-
- statement = (
- select(PublicSigningKey)
- .join(PMCKeyLink)
- .join(PMC)
- .where(cast(ColumnElement[bool], PMC.project_name == pmc_name))
- )
- result = session.execute(statement)
- public_keys = [key.ascii_armored_key for key in result.scalars().all()]
-
- with open(signature_path, "rb") as sig_file:
- return signature_gpg_file(sig_file, artifact_path, public_keys)
-
-
-def signature_gpg_file(sig_file: BinaryIO, artifact_path: str,
ascii_armored_keys: list[str]) -> dict[str, Any]:
- """Verify a GPG signature for a file."""
-
- @contextmanager
- def ephemeral_gpg_home() -> Generator[str]:
- """Create a temporary directory for an isolated GPG home, and clean it
up on exit."""
- temp_dir = tempfile.mkdtemp(prefix="gpg-")
- try:
- yield temp_dir
- finally:
- shutil.rmtree(temp_dir)
-
- with ephemeral_gpg_home() as gpg_home:
- gpg = gnupg.GPG(gnupghome=gpg_home)
-
- # Import all PMC public signing keys
- for key in ascii_armored_keys:
- import_result = gpg.import_keys(key)
- if not import_result.fingerprints:
- # TODO: Log warning about invalid key?
- continue
- verified = gpg.verify_file(sig_file, str(artifact_path))
-
- # Collect all available information for debugging
- debug_info = {
- "key_id": verified.key_id or "Not available",
- "fingerprint": verified.fingerprint.lower() if verified.fingerprint
else "Not available",
- "pubkey_fingerprint": verified.pubkey_fingerprint.lower() if
verified.pubkey_fingerprint else "Not available",
- "creation_date": verified.creation_date or "Not available",
- "timestamp": verified.timestamp or "Not available",
- "username": verified.username or "Not available",
- "status": verified.status or "Not available",
- "valid": bool(verified),
- "trust_level": verified.trust_level if hasattr(verified,
"trust_level") else "Not available",
- "trust_text": verified.trust_text if hasattr(verified, "trust_text")
else "Not available",
- "stderr": verified.stderr if hasattr(verified, "stderr") else "Not
available",
- "num_pmc_keys": len(ascii_armored_keys),
- }
-
- if not verified:
- raise VerifyError("No valid signature found", debug_info)
-
- return {
- "verified": True,
- "key_id": verified.key_id,
- "timestamp": verified.timestamp,
- "username": verified.username or "Unknown",
- "email": verified.pubkey_fingerprint.lower() or "Unknown",
- "status": "Valid signature",
- "debug_info": debug_info,
- }
-
-
# File type comment style definitions
# Ordered by their popularity in the Stack Overflow Developer Survey 2024
COMMENT_STYLES = {
diff --git a/atr/worker.py b/atr/worker.py
index d01d1c2..516f53e 100644
--- a/atr/worker.py
+++ b/atr/worker.py
@@ -38,6 +38,7 @@ from sqlalchemy import text
import atr.tasks.archive as archive
import atr.tasks.bulk as bulk
import atr.tasks.mailtest as mailtest
+import atr.tasks.signature as signature
import atr.tasks.task as task
import atr.tasks.vote as vote
import atr.verify as verify
@@ -87,6 +88,20 @@ def setup_logging() -> None:
def task_error_handle(task_id: int, e: Exception) -> None:
"""Handle task error by updating the database with error information."""
if isinstance(e, verify.VerifyError):
+ # VerifyError is deprecated, use task.Error instead
+ _LOGGER.error(f"Task {task_id} failed: {e.message}")
+ result = json.dumps(e.result)
+ with create_sync_db_session() as session:
+ with session.begin():
+ session.execute(
+ text("""
+ UPDATE task
+ SET status = 'FAILED', completed = :now, error =
:error, result = :result
+ WHERE id = :task_id
+ """),
+ {"now": datetime.datetime.now(UTC), "task_id": task_id,
"error": e.message, "result": result},
+ )
+ elif isinstance(e, task.Error):
_LOGGER.error(f"Task {task_id} failed: {e.message}")
result = json.dumps(e.result)
with create_sync_db_session() as session:
@@ -188,15 +203,6 @@ def task_verify_license_files(args: list[str]) ->
tuple[str, str | None, tuple[A
return status, error, task_results
-def task_verify_signature(args: list[str]) -> tuple[str, str | None,
tuple[Any, ...]]:
- """Process verify_signature task."""
- task_results = task_process_wrap(verify.signature(*args))
- _LOGGER.info(f"Verified {args} with result {task_results[0]}")
- status = "FAILED" if task_results[0].get("error") else "COMPLETED"
- error = task_results[0].get("error")
- return status, error, task_results
-
-
def task_verify_license_headers(args: list[str]) -> tuple[str, str | None,
tuple[Any, ...]]:
"""Process verify_license_headers task."""
task_results = task_process_wrap(verify.license_header_verify(*args))
@@ -387,7 +393,7 @@ def task_process(task_id: int, task_type: str, task_args:
str) -> None:
"verify_archive_integrity": archive.check_integrity,
"verify_archive_structure": archive.check_structure,
"verify_license_files": task_verify_license_files,
- "verify_signature": task_verify_signature,
+ "verify_signature": signature.check,
"verify_license_headers": task_verify_license_headers,
"verify_rat_license": task_verify_rat_license,
"generate_cyclonedx_sbom": task_generate_cyclonedx_sbom,
@@ -403,7 +409,7 @@ def task_process(task_id: int, task_type: str, task_args:
str) -> None:
raise Exception(msg)
raw_status, error, task_results = handler(args)
- if isinstance(raw_status, task.TaskStatus):
+ if isinstance(raw_status, task.Status):
status = raw_status.value
elif isinstance(raw_status, str):
status = raw_status
diff --git a/docs/conventions.md b/docs/conventions.md
index bfe7efb..61ac911 100644
--- a/docs/conventions.md
+++ b/docs/conventions.md
@@ -28,7 +28,7 @@ Do not use uppercase for constants within functions and
methods.
### Use the `Final` type with all constants
-This pattern must be followed for top level constants, and should be followed
for function and method level constants too.
+This pattern must be followed for top level constants, and should be followed
for function and method level constants too. The longer the function, the more
important the use of `Final`.
### Prefix global variables with `global_`
@@ -70,7 +70,7 @@ from p.q.r import s
s()
```
-The typing module is an exception to this rule. Always import typing
identifiers directly using the `from` syntax:
+The collections.abc and typing modules are an exception to this rule. Always
import typing identifiers directly using the `from` syntax:
```python
# Preferred
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]