This is an automated email from the ASF dual-hosted git repository.
sbp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-release.git
The following commit(s) were added to refs/heads/main by this push:
new a2ddc73 Convert archive tasks to check result style
a2ddc73 is described below
commit a2ddc73e9f6337e7ea69f67d96dff21ee60bd565
Author: Sean B. Palmer <[email protected]>
AuthorDate: Mon Mar 31 14:58:18 2025 +0100
Convert archive tasks to check result style
---
atr/routes/draft.py | 23 +++---
atr/tasks/__init__.py | 15 ++--
atr/tasks/archive.py | 126 ------------------------------
atr/tasks/{task.py => checks/__init__.py} | 69 +++++++++++-----
atr/tasks/checks/archive.py | 120 ++++++++++++++++++++++++++++
atr/tasks/license.py | 2 +-
atr/tasks/rat.py | 2 +-
atr/tasks/sbom.py | 2 +-
atr/tasks/task.py | 83 +-------------------
atr/templates/draft-review.html | 53 +++----------
atr/worker.py | 35 +++++++--
11 files changed, 233 insertions(+), 297 deletions(-)
diff --git a/atr/routes/draft.py b/atr/routes/draft.py
index c1184b6..c2559b0 100644
--- a/atr/routes/draft.py
+++ b/atr/routes/draft.py
@@ -488,15 +488,15 @@ async def review(session: routes.CommitterSession,
project_name: str, version_na
base_path = util.get_release_candidate_draft_dir() / project_name /
version_name
paths = await util.paths_recursive(base_path)
- paths_set = set(paths)
+ # paths_set = set(paths)
path_templates = {}
path_substitutions = {}
path_artifacts = set()
path_metadata = set()
+ path_modified = {}
+ path_successes = {}
path_warnings = {}
path_errors = {}
- path_modified = {}
- path_tasks: dict[pathlib.Path, dict[str, models.Task]] = {}
for path in paths:
# Get template and substitutions
elements = {
@@ -523,15 +523,20 @@ async def review(session: routes.CommitterSession,
project_name: str, version_na
elif ext_metadata:
path_metadata.add(path)
- # Get warnings and errors
- path_warnings[path], path_errors[path] =
_path_warnings_errors(paths_set, path, ext_artifact, ext_metadata)
-
# Get modified time
full_path = str(util.get_release_candidate_draft_dir() / project_name
/ version_name / path)
path_modified[path] = int(await aiofiles.os.path.getmtime(full_path))
- # Get the most recent task for each type
- path_tasks[path] = await db.recent_tasks(data,
f"{project_name}-{version_name}", str(path), path_modified[path])
+ # Get successes, warnings, and errors
+ path_successes[path] = await data.check_result(
+ release_name=f"{project_name}-{version_name}", path=str(path),
status=models.CheckResultStatus.SUCCESS
+ ).all()
+ path_warnings[path] = await data.check_result(
+ release_name=f"{project_name}-{version_name}", path=str(path),
status=models.CheckResultStatus.WARNING
+ ).all()
+ path_errors[path] = await data.check_result(
+ release_name=f"{project_name}-{version_name}", path=str(path),
status=models.CheckResultStatus.FAILURE
+ ).all()
return await quart.render_template(
"draft-review.html",
@@ -545,10 +550,10 @@ async def review(session: routes.CommitterSession,
project_name: str, version_na
substitutions=path_substitutions,
artifacts=path_artifacts,
metadata=path_metadata,
+ successes=path_successes,
warnings=path_warnings,
errors=path_errors,
modified=path_modified,
- tasks=path_tasks,
models=models,
)
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 0fa211a..f1ea785 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -15,12 +15,11 @@
# specific language governing permissions and limitations
# under the License.
-import os.path
-
import aiofiles.os
import atr.db.models as models
-import atr.tasks.archive as archive
+import atr.tasks.checks as checks
+import atr.tasks.checks.archive as archive
import atr.tasks.hashing as hashing
import atr.util as util
@@ -91,22 +90,22 @@ async def sha_checks(release: models.Release, hash_file:
str) -> list[models.Tas
async def tar_gz_checks(release: models.Release, path: str, signature_path:
str | None = None) -> list[models.Task]:
# TODO: We should probably use an enum for task_type
full_path = str(util.get_release_candidate_draft_dir() /
release.project.name / release.version / path)
- filename = os.path.basename(path)
+ # filename = os.path.basename(path)
modified = int(await aiofiles.os.path.getmtime(full_path))
tasks = [
models.Task(
status=models.TaskStatus.QUEUED,
- task_type="verify_archive_integrity",
- task_args=archive.CheckIntegrity(path=full_path).model_dump(),
+ task_type=checks.function_key(archive.integrity),
+ task_args=archive.Integrity(release_name=release.name,
abs_path=full_path).model_dump(),
release_name=release.name,
path=path,
modified=modified,
),
models.Task(
status=models.TaskStatus.QUEUED,
- task_type="verify_archive_structure",
- task_args=[full_path, filename],
+ task_type=checks.function_key(archive.structure),
+ task_args=archive.Structure(release_name=release.name,
abs_path=full_path).model_dump(),
release_name=release.name,
path=path,
modified=modified,
diff --git a/atr/tasks/archive.py b/atr/tasks/archive.py
deleted file mode 100644
index 6e70b67..0000000
--- a/atr/tasks/archive.py
+++ /dev/null
@@ -1,126 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-
-import asyncio
-import logging
-import os.path
-import tarfile
-from typing import Any, Final
-
-import pydantic
-
-import atr.db.models as models
-import atr.tasks.task as task
-import atr.util as util
-
-_LOGGER: Final = logging.getLogger(__name__)
-
-
-class CheckIntegrity(pydantic.BaseModel):
- """Parameters for archive integrity checking."""
-
- path: str = pydantic.Field(..., description="Path to the .tar.gz file to
check")
- chunk_size: int = pydantic.Field(default=4096, description="Size of chunks
to read when checking the file")
-
-
-async def check_integrity(args: dict[str, Any]) -> tuple[models.TaskStatus,
str | None, tuple[Any, ...]]:
- """Check the integrity of a .tar.gz file."""
- # TODO: We should standardise the "ERROR" mechanism here in the data
- # Then we can have a single task wrapper for all tasks
- # TODO: We should use task.TaskError as standard, and maybe typeguard each
function
- data = CheckIntegrity(**args)
- # TODO: Check arguments should have release_name and path as standard
- # Followed by any necessary additional arguments
- release_name, rel_path = util.abs_path_to_release_and_rel_path(data.path)
- check = await task.Check.create(checker=check_integrity,
release_name=release_name, path=rel_path)
- try:
- size = await asyncio.to_thread(_check_integrity_core, data.path,
data.chunk_size)
- await check.success("Able to read all entries of the archive using
tarfile", {"size": size})
- except Exception as e:
- await check.failure("Unable to read all entries of the archive using
tarfile", {"error": str(e)})
- return task.FAILED, str(e), ()
- task_results = task.results_as_tuple(await
asyncio.to_thread(_check_integrity_core, data.path, data.chunk_size))
- _LOGGER.info(f"Verified {data.path} and computed size {task_results[0]}")
- return task.COMPLETED, None, task_results
-
-
-def check_structure(args: list[str]) -> tuple[models.TaskStatus, str | None,
tuple[Any, ...]]:
- """Check the structure of a .tar.gz file."""
- task_results = task.results_as_tuple(_check_structure_core(*args))
- _LOGGER.info(f"Verified archive structure for {args}")
- status = task.FAILED if not task_results[0]["valid"] else task.COMPLETED
- error = task_results[0]["message"] if not task_results[0]["valid"] else
None
- return status, error, task_results
-
-
-def root_directory(tgz_path: str) -> str:
- """Find the root directory in a tar archive and validate that it has only
one root dir."""
- root = None
-
- with tarfile.open(tgz_path, mode="r|gz") as tf:
- for member in tf:
- parts = member.name.split("/", 1)
- if len(parts) >= 1:
- if not root:
- root = parts[0]
- elif parts[0] != root:
- raise task.Error(f"Multiple root directories found:
{root}, {parts[0]}")
-
- if not root:
- raise task.Error("No root directory found in archive")
-
- return root
-
-
-def _check_integrity_core(tgz_path: str, chunk_size: int = 4096) -> int:
- """Verify a .tar.gz file and compute its uncompressed size."""
- total_size = 0
-
- with tarfile.open(tgz_path, mode="r|gz") as tf:
- for member in tf:
- total_size += member.size
- # Verify file by extraction
- if member.isfile():
- f = tf.extractfile(member)
- if f is not None:
- while True:
- data = f.read(chunk_size)
- if not data:
- break
- return total_size
-
-
-def _check_structure_core(tgz_path: str, filename: str) -> dict[str, Any]:
- """
- Verify that the archive contains exactly one root directory named after
the package.
- The package name should match the archive filename without the .tar.gz
extension.
- """
- expected_root: Final[str] =
os.path.splitext(os.path.splitext(filename)[0])[0]
-
- try:
- root = root_directory(tgz_path)
- except ValueError as e:
- return {"valid": False, "root_dirs": [], "message": str(e)}
-
- if root != expected_root:
- return {
- "valid": False,
- "root_dirs": [root],
- "message": f"Root directory '{root}' does not match expected name
'{expected_root}'",
- }
-
- return {"valid": True, "root_dirs": [root], "message": "Archive structure
is valid"}
diff --git a/atr/tasks/task.py b/atr/tasks/checks/__init__.py
similarity index 69%
copy from atr/tasks/task.py
copy to atr/tasks/checks/__init__.py
index 833e48a..48da530 100644
--- a/atr/tasks/task.py
+++ b/atr/tasks/checks/__init__.py
@@ -18,35 +18,30 @@
from __future__ import annotations
import datetime
-from typing import TYPE_CHECKING, Any, Final
+import pathlib
+from functools import wraps
+from typing import TYPE_CHECKING, Any, TypeVar
+import pydantic
import sqlmodel
-import atr.db as db
-import atr.db.models as models
-
if TYPE_CHECKING:
- from collections.abc import Callable
-
-QUEUED: Final = models.TaskStatus.QUEUED
-ACTIVE: Final = models.TaskStatus.ACTIVE
-COMPLETED: Final = models.TaskStatus.COMPLETED
-FAILED: Final = models.TaskStatus.FAILED
+ from collections.abc import Awaitable, Callable
+import atr.config as config
+import atr.db as db
+import atr.db.models as models
-class Error(Exception):
- """Error during task execution."""
- def __init__(self, message: str, *result: Any) -> None:
- self.message = message
- self.result = tuple(result)
+def function_key(func: Callable[..., Any]) -> str:
+ return func.__module__ + "." + func.__name__
class Check:
def __init__(
self, checker: Callable[..., Any], release_name: str, path: str | None
= None, afresh: bool = True
) -> None:
- self.checker = checker.__module__ + "." + checker.__name__
+ self.checker = function_key(checker)
self.release_name = release_name
self.path = path
self.afresh = afresh
@@ -116,8 +111,40 @@ class Check:
return await self._add(models.CheckResultStatus.WARNING, message,
data, path=path)
-def results_as_tuple(item: Any) -> tuple[Any, ...]:
- """Ensure that returned results are structured as a tuple."""
- if not isinstance(item, tuple):
- return (item,)
- return item
+def rel_path(abs_path: str) -> str:
+ """Return the relative path for a given absolute path."""
+ conf = config.get()
+ phase_dir = pathlib.Path(conf.PHASE_STORAGE_DIR)
+ phase_sub_dir = pathlib.Path(abs_path).relative_to(phase_dir)
+ # Skip the first component, which is the phase name
+ # And the next two components, which are the project name and version name
+ return str(pathlib.Path(*phase_sub_dir.parts[3:]))
+
+
+# def using(cls: type[pydantic.BaseModel]) -> Callable[[Callable[..., Any]],
Callable[..., Any]]:
+# """Decorator to specify the parameters for a check."""
+
+# def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
+# @wraps(func)
+# async def wrapper(data_dict: dict[str, Any], *args: Any, **kwargs:
Any) -> Any:
+# model_instance = cls(**data_dict)
+# return await func(model_instance, *args, **kwargs)
+# return wrapper
+
+# return decorator
+
+
+T = TypeVar("T", bound=pydantic.BaseModel)
+R = TypeVar("R")
+
+
+def with_model(model_class: type[T]) -> Callable[[Callable[...,
Awaitable[R]]], Callable[..., Awaitable[R]]]:
+ def decorator(func: Callable[..., Awaitable[R]]) -> Callable[...,
Awaitable[R]]:
+ @wraps(func)
+ async def wrapper(data_dict: dict[str, Any], *args: Any, **kwargs:
Any) -> R:
+ model_instance = model_class(**data_dict)
+ return await func(model_instance, *args, **kwargs)
+
+ return wrapper
+
+ return decorator
diff --git a/atr/tasks/checks/archive.py b/atr/tasks/checks/archive.py
new file mode 100644
index 0000000..f25665a
--- /dev/null
+++ b/atr/tasks/checks/archive.py
@@ -0,0 +1,120 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import asyncio
+import logging
+import os.path
+import tarfile
+from typing import Final
+
+import pydantic
+
+import atr.tasks.checks as checks
+
+_LOGGER: Final = logging.getLogger(__name__)
+
+
+class Integrity(pydantic.BaseModel):
+ """Parameters for archive integrity checking."""
+
+ release_name: str = pydantic.Field(..., description="Release name")
+ abs_path: str = pydantic.Field(..., description="Absolute path to the
.tar.gz file to check")
+ chunk_size: int = pydantic.Field(default=4096, description="Size of chunks
to read when checking the file")
+
+
[email protected]_model(Integrity)
+async def integrity(args: Integrity) -> str | None:
+ """Check the integrity of a .tar.gz file."""
+ rel_path = checks.rel_path(args.abs_path)
+ _LOGGER.info(f"ABS, REL: {args.abs_path} {rel_path}")
+ check = await checks.Check.create(checker=integrity,
release_name=args.release_name, path=rel_path)
+ try:
+ size = await asyncio.to_thread(_integrity_core, args.abs_path,
args.chunk_size)
+ await check.success("Able to read all entries of the archive using
tarfile", {"size": size})
+ except Exception as e:
+ await check.failure("Unable to read all entries of the archive using
tarfile", {"error": str(e)})
+ return None
+
+
+class Structure(pydantic.BaseModel):
+ """Parameters for archive structure checking."""
+
+ release_name: str = pydantic.Field(..., description="Release name")
+ abs_path: str = pydantic.Field(..., description="Absolute path to the
.tar.gz file to check")
+
+
[email protected]_model(Structure)
+async def structure(args: Structure) -> str | None:
+ """Check the structure of a .tar.gz file."""
+ rel_path = checks.rel_path(args.abs_path)
+ check = await checks.Check.create(checker=structure,
release_name=args.release_name, path=rel_path)
+ filename = os.path.basename(args.abs_path)
+ expected_root: Final[str] =
os.path.splitext(os.path.splitext(filename)[0])[0]
+ _LOGGER.info(f"Checking structure for {args.abs_path} (expected root:
{expected_root})")
+
+ try:
+ root = await asyncio.to_thread(root_directory, args.abs_path)
+ if root == expected_root:
+ await check.success(
+ "Archive contains exactly one root directory matching the
expected name",
+ {"root": root, "expected": expected_root},
+ )
+ else:
+ await check.failure(
+ f"Root directory '{root}' does not match expected name
'{expected_root}'",
+ {"root": root, "expected": expected_root},
+ )
+ except Exception as e:
+ await check.failure("Unable to verify archive structure", {"error":
str(e)})
+ return None
+
+
+def root_directory(tgz_path: str) -> str:
+ """Find the root directory in a tar archive and validate that it has only
one root dir."""
+ root = None
+
+ with tarfile.open(tgz_path, mode="r|gz") as tf:
+ for member in tf:
+ parts = member.name.split("/", 1)
+ if len(parts) >= 1:
+ if not root:
+ root = parts[0]
+ elif parts[0] != root:
+ raise ValueError(f"Multiple root directories found:
{root}, {parts[0]}")
+
+ if not root:
+ raise ValueError("No root directory found in archive")
+
+ return root
+
+
+def _integrity_core(tgz_path: str, chunk_size: int = 4096) -> int:
+ """Verify a .tar.gz file and compute its uncompressed size."""
+ total_size = 0
+
+ with tarfile.open(tgz_path, mode="r|gz") as tf:
+ for member in tf:
+ total_size += member.size
+ # Verify file by extraction
+ if member.isfile():
+ f = tf.extractfile(member)
+ if f is not None:
+ while True:
+ data = f.read(chunk_size)
+ if not data:
+ break
+ return total_size
diff --git a/atr/tasks/license.py b/atr/tasks/license.py
index 4699a10..078df70 100644
--- a/atr/tasks/license.py
+++ b/atr/tasks/license.py
@@ -22,7 +22,7 @@ import tarfile
from typing import Any, Final
import atr.db.models as models
-import atr.tasks.archive as archive
+import atr.tasks.checks.archive as archive
import atr.tasks.task as task
_LOGGER = logging.getLogger(__name__)
diff --git a/atr/tasks/rat.py b/atr/tasks/rat.py
index 87b73bd..ffc0e4b 100644
--- a/atr/tasks/rat.py
+++ b/atr/tasks/rat.py
@@ -24,7 +24,7 @@ from typing import Any, Final
import atr.config as config
import atr.db.models as models
-import atr.tasks.archive as archive
+import atr.tasks.checks.archive as archive
import atr.tasks.sbom as sbom
import atr.tasks.task as task
diff --git a/atr/tasks/sbom.py b/atr/tasks/sbom.py
index 18a7d72..b21356e 100644
--- a/atr/tasks/sbom.py
+++ b/atr/tasks/sbom.py
@@ -22,7 +22,7 @@ from typing import Any, Final
import atr.config as config
import atr.db.models as models
-import atr.tasks.archive as archive
+import atr.tasks.checks.archive as archive
import atr.tasks.task as task
_CONFIG: Final = config.get()
diff --git a/atr/tasks/task.py b/atr/tasks/task.py
index 833e48a..f73add5 100644
--- a/atr/tasks/task.py
+++ b/atr/tasks/task.py
@@ -17,17 +17,10 @@
from __future__ import annotations
-import datetime
-from typing import TYPE_CHECKING, Any, Final
+from typing import Any, Final
-import sqlmodel
-
-import atr.db as db
import atr.db.models as models
-if TYPE_CHECKING:
- from collections.abc import Callable
-
QUEUED: Final = models.TaskStatus.QUEUED
ACTIVE: Final = models.TaskStatus.ACTIVE
COMPLETED: Final = models.TaskStatus.COMPLETED
@@ -42,80 +35,6 @@ class Error(Exception):
self.result = tuple(result)
-class Check:
- def __init__(
- self, checker: Callable[..., Any], release_name: str, path: str | None
= None, afresh: bool = True
- ) -> None:
- self.checker = checker.__module__ + "." + checker.__name__
- self.release_name = release_name
- self.path = path
- self.afresh = afresh
- self._constructed = False
-
- @classmethod
- async def create(
- cls, checker: Callable[..., Any], release_name: str, path: str | None
= None, afresh: bool = True
- ) -> Check:
- check = cls(checker, release_name, path, afresh)
- if afresh is True:
- # Clear outer path whether it's specified or not
- await check._clear(path)
- check._constructed = True
- return check
-
- async def _add(
- self, status: models.CheckResultStatus, message: str, data: Any, path:
str | None = None
- ) -> models.CheckResult:
- if self._constructed is False:
- raise RuntimeError("Cannot add check result to a check that has
not been constructed")
- if path is not None:
- if self.path is not None:
- raise ValueError("Cannot specify path twice")
- if self.afresh is True:
- # Clear inner path only if it's specified
- await self._clear(path)
-
- result = models.CheckResult(
- release_name=self.release_name,
- checker=self.checker,
- path=path or self.path,
- created=datetime.datetime.now(),
- status=status,
- message=message,
- data=data,
- )
-
- # It would be more efficient to keep a session open
- # But, we prefer in this case to maintain a simpler interface
- # If performance is unacceptable, we can revisit this design
- async with db.session() as session:
- session.add(result)
- await session.commit()
- return result
-
- async def _clear(self, path: str | None = None) -> None:
- async with db.session() as data:
- stmt = sqlmodel.delete(models.CheckResult).where(
-
db.validate_instrumented_attribute(models.CheckResult.release_name) ==
self.release_name,
- db.validate_instrumented_attribute(models.CheckResult.checker)
== self.checker,
- db.validate_instrumented_attribute(models.CheckResult.path) ==
path,
- )
- await data.execute(stmt)
- await data.commit()
-
- async def exception(self, message: str, data: Any, path: str | None =
None) -> models.CheckResult:
- return await self._add(models.CheckResultStatus.EXCEPTION, message,
data, path=path)
-
- async def failure(self, message: str, data: Any, path: str | None = None)
-> models.CheckResult:
- return await self._add(models.CheckResultStatus.FAILURE, message,
data, path=path)
-
- async def success(self, message: str, data: Any, path: str | None = None)
-> models.CheckResult:
- return await self._add(models.CheckResultStatus.SUCCESS, message,
data, path=path)
-
- async def warning(self, message: str, data: Any, path: str | None = None)
-> models.CheckResult:
- return await self._add(models.CheckResultStatus.WARNING, message,
data, path=path)
-
-
def results_as_tuple(item: Any) -> tuple[Any, ...]:
"""Ensure that returned results are structured as a tuple."""
if not isinstance(item, tuple):
diff --git a/atr/templates/draft-review.html b/atr/templates/draft-review.html
index 1932df7..23d4024 100644
--- a/atr/templates/draft-review.html
+++ b/atr/templates/draft-review.html
@@ -74,53 +74,20 @@
{% endif %}
</td>
<td>
- {% set completed_count = namespace(value=0) %}
- {% set failed_count = namespace(value=0) %}
- {% set active_count = namespace(value=0) %}
- {% set queued_count = namespace(value=0) %}
-
- {% for task_type, task in tasks[path].items() %}
- {% if task.status.value == 'completed' %}
- {% set completed_count.value = completed_count.value +
1 %}
- {% elif task.status.value == 'failed' %}
- {% set failed_count.value = failed_count.value + 1 %}
- {% elif task.status.value == 'active' %}
- {% set active_count.value = active_count.value + 1 %}
- {% elif task.status.value == 'queued' %}
- {% set queued_count.value = queued_count.value + 1 %}
- {% endif %}
- {% endfor %}
-
- {% if completed_count.value > 0 or failed_count.value > 0
or active_count.value > 0 or queued_count.value > 0 %}
+ <!-- TODO: Show number of tasks which are currently
running -->
+ {% if successes[path] or warnings[path] or errors[path] %}
<div class="d-flex flex-wrap gap-2 mt-1 mb-2">
- {% if completed_count.value > 0 %}<span class="badge
bg-success">{{ completed_count.value }} Passed</span>{% endif %}
- {% if failed_count.value > 0 %}
- <span class="badge bg-danger">{{ failed_count.value
}} {{ "Issue" if failed_count.value == 1 else "Issues" }}</span>
+ {% if successes[path]|length > 0 %}
+ <span class="badge bg-success">{{
successes[path]|length }} Passed</span>
+ {% endif %}
+ {% if warnings[path]|length > 0 %}
+ <span class="badge bg-warning">{{
warnings[path]|length }} {{ "Issue" if warnings[path]|length == 1 else "Issues"
}}</span>
+ {% endif %}
+ {% if errors[path]|length > 0 %}
+ <span class="badge bg-danger">{{ errors[path]|length
}} {{ "Issue" if errors[path]|length == 1 else "Issues" }}</span>
{% endif %}
- {% if active_count.value > 0 %}<span class="badge
bg-info">{{ active_count.value }} Running</span>{% endif %}
- {% if queued_count.value > 0 %}<span class="badge
bg-secondary">{{ queued_count.value }} Pending</span>{% endif %}
</div>
{% endif %}
- {% if errors[path]|length > 0 %}
- <details>
- <summary>{{ errors[path]|length }} Errors</summary>
- {% for error in errors[path] %}
- <div class="alert alert-danger p-1 px-2 mt-0 mb-3">
- <i class="fas fa-exclamation-triangle"></i> {{
error }}
- </div>
- {% endfor %}
- </details>
- {% endif %}
- {% if warnings[path]|length > 0 %}
- <details>
- <summary>{{ warnings[path]|length }} Warnings</summary>
- {% for warning in warnings[path] %}
- <div class="alert alert-warning p-1 px-2 mt-0 mb-3">
- <i class="fas fa-exclamation-triangle"></i> {{
warning }}
- </div>
- {% endfor %}
- </details>
- {% endif %}
</td>
<td class="atr-sans">
<a href="{{ as_url(routes.download.phase,
phase='candidate-draft', project=release.project.name, version=release.version,
path=path) }}"
diff --git a/atr/worker.py b/atr/worker.py
index 929ce9f..c621960 100644
--- a/atr/worker.py
+++ b/atr/worker.py
@@ -31,13 +31,12 @@ import resource
import signal
import sys
import traceback
-from typing import Any, Final
+from typing import TYPE_CHECKING, Any, Final
import sqlmodel
import atr.db as db
import atr.db.models as models
-import atr.tasks.archive as archive
import atr.tasks.bulk as bulk
import atr.tasks.hashing as hashing
import atr.tasks.license as license
@@ -49,6 +48,9 @@ import atr.tasks.signature as signature
import atr.tasks.task as task
import atr.tasks.vote as vote
+if TYPE_CHECKING:
+ from collections.abc import Awaitable, Callable
+
_LOGGER: Final = logging.getLogger(__name__)
# Resource limits, 5 minutes and 1GB
@@ -179,12 +181,19 @@ async def _task_result_process(
async def _task_process(task_id: int, task_type: str, task_args: list[str] |
dict[str, Any]) -> None:
"""Process a claimed task."""
+ import atr.tasks.checks as checks
+ import atr.tasks.checks.archive as checks_archive
+
_LOGGER.info(f"Processing task {task_id} ({task_type}) with args
{task_args}")
try:
# Map task types to their handler functions
+ modern_task_handlers: dict[str, Callable[..., Awaitable[str | None]]]
= {
+ checks.function_key(checks_archive.integrity):
checks_archive.integrity,
+ checks.function_key(checks_archive.structure):
checks_archive.structure,
+ }
# TODO: We should use a decorator to register these automatically
dict_task_handlers = {
- "verify_archive_integrity": archive.check_integrity,
+ # "verify_archive_integrity": archive.check_integrity,
"package_bulk_download": bulk.download,
"rsync_analyse": rsync.analyse,
"verify_file_hash": hashing.check,
@@ -192,7 +201,7 @@ async def _task_process(task_id: int, task_type: str,
task_args: list[str] | dic
# TODO: These are synchronous
# We plan to convert these to async dict handlers
list_task_handlers = {
- "verify_archive_structure": archive.check_structure,
+ # "verify_archive_structure": archive.check_structure,
"verify_license_files": license.check_files,
"verify_signature": signature.check,
"verify_license_headers": license.check_headers,
@@ -202,7 +211,23 @@ async def _task_process(task_id: int, task_type: str,
task_args: list[str] | dic
"vote_initiate": vote.initiate,
}
- if isinstance(task_args, dict):
+ task_results: tuple[Any, ...]
+ if task_type in modern_task_handlers:
+ # NOTE: The other two branches below are deprecated
+ # This is transitional code, which we will tidy up significantly
+ handler = modern_task_handlers[task_type]
+ try:
+ handler_result = await handler(task_args)
+ task_results = tuple()
+ if handler_result is not None:
+ task_results = (handler_result,)
+ status = task.COMPLETED
+ error = None
+ except Exception as e:
+ task_results = tuple()
+ status = task.FAILED
+ error = str(e)
+ elif isinstance(task_args, dict):
dict_handler = dict_task_handlers.get(task_type)
if not dict_handler:
msg = f"Unknown task type: {task_type}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]