This is an automated email from the ASF dual-hosted git repository.
arm pushed a commit to branch arm
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/arm by this push:
new 3aedfa5c Check for running tasks as well as completed checks when
using cache keys
3aedfa5c is described below
commit 3aedfa5c5b62857cbc4060c2f6611e72da5f7f70
Author: Alastair McFarlane <[email protected]>
AuthorDate: Tue Feb 17 15:13:34 2026 +0000
Check for running tasks as well as completed checks when using cache keys
---
atr/db/__init__.py | 7 +++
atr/models/sql.py | 1 +
atr/tasks/__init__.py | 59 +++++++++++++++----------
migrations/versions/0050_2026.02.17_7406bb29.py | 29 ++++++++++++
4 files changed, 72 insertions(+), 24 deletions(-)
diff --git a/atr/db/__init__.py b/atr/db/__init__.py
index 2c6d579e..fb282628 100644
--- a/atr/db/__init__.py
+++ b/atr/db/__init__.py
@@ -670,8 +670,10 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession):
self,
id: Opt[int] = NOT_SET,
status: Opt[sql.TaskStatus] = NOT_SET,
+ status_in: Opt[list[sql.TaskStatus]] = NOT_SET,
task_type: Opt[str] = NOT_SET,
task_args: Opt[Any] = NOT_SET,
+ inputs_hash: Opt[str] = NOT_SET,
asf_uid: Opt[str] = NOT_SET,
added: Opt[datetime.datetime] = NOT_SET,
started: Opt[datetime.datetime | None] = NOT_SET,
@@ -687,14 +689,19 @@ class Session(sqlalchemy.ext.asyncio.AsyncSession):
) -> Query[sql.Task]:
query = sqlmodel.select(sql.Task)
+ via = sql.validate_instrumented_attribute
if is_defined(id):
query = query.where(sql.Task.id == id)
if is_defined(status):
query = query.where(sql.Task.status == status)
+ if is_defined(status_in):
+ query = query.where(via(sql.Task.status).in_(status_in))
if is_defined(task_type):
query = query.where(sql.Task.task_type == task_type)
if is_defined(task_args):
query = query.where(sql.Task.task_args == task_args)
+ if is_defined(inputs_hash):
+ query = query.where(sql.Task.inputs_hash == inputs_hash)
if is_defined(asf_uid):
query = query.where(sql.Task.asf_uid == asf_uid)
if is_defined(added):
diff --git a/atr/models/sql.py b/atr/models/sql.py
index c03a3953..c1eda568 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -357,6 +357,7 @@ class Task(sqlmodel.SQLModel, table=True):
status: TaskStatus = sqlmodel.Field(default=TaskStatus.QUEUED, index=True)
task_type: TaskType
task_args: Any =
sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON))
+ inputs_hash: str | None = sqlmodel.Field(default=None, index=True,
**example("blake3:7f83b1657ff1fc..."))
asf_uid: str
added: datetime.datetime = sqlmodel.Field(
default_factory=lambda: datetime.datetime.now(datetime.UTC),
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 00beb6d7..42e03406 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -17,7 +17,6 @@
import asyncio
import datetime
-import logging
import pathlib
from collections.abc import Awaitable, Callable, Coroutine
from typing import Any, Final
@@ -220,24 +219,24 @@ async def _draft_file_checks(
# TODO: Should we check .json files for their content?
# Ideally we would not have to do that
if path.name.endswith(".cdx.json"):
- data.add(
- await queued(
- asf_uid,
- sql.TaskType.SBOM_TOOL_SCORE,
- release,
- revision_number,
- caller_data,
- path_str,
- extra_args={
- "project_name": project_name,
- "version_name": release_version,
- "revision_number": revision_number,
- "previous_release_version": previous_version.version if
previous_version else None,
- "file_path": path_str,
- "asf_uid": asf_uid,
- },
- )
+ cdx_task = await queued(
+ asf_uid,
+ sql.TaskType.SBOM_TOOL_SCORE,
+ release,
+ revision_number,
+ caller_data,
+ path_str,
+ extra_args={
+ "project_name": project_name,
+ "version_name": release_version,
+ "revision_number": revision_number,
+ "previous_release_version": previous_version.version if
previous_version else None,
+ "file_path": path_str,
+ "asf_uid": asf_uid,
+ },
)
+ if cdx_task:
+ data.add(cdx_task)
async def keys_import_file(
@@ -299,17 +298,29 @@ async def queued(
extra_args: dict[str, Any] | None = None,
check_cache_key: dict[str, Any] | None = None,
) -> sql.Task | None:
+ # If there's a queued or running task for this same set of inputs and hash
value, don't start a new one
+ # If there isn't one, but there is an existing check result, also don't
run a new task, just use the existing one
if check_cache_key is not None:
- logging.info("cache key", check_cache_key)
hash_val = hashes.compute_dict_hash(check_cache_key)
if not data:
raise RuntimeError("DB Session is required for check_cache_key")
- existing = await data.check_result(inputs_hash=hash_val,
release_name=release.name).all()
- if existing:
- await attestable.write_checks_data(
- release.project.name, release.version, revision_number, [c.id
for c in existing]
- )
+ existing_task = await data.task(
+ inputs_hash=hash_val,
+ project_name=release.project_name,
+ version_name=release.version,
+ task_args=extra_args or {},
+ status_in=[sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE],
+ ).all()
+ if existing_task:
return None
+ else:
+ existing = await data.check_result(inputs_hash=hash_val,
release_name=release.name).all()
+ if existing:
+ await attestable.write_checks_data(
+ release.project.name, release.version, revision_number,
[c.id for c in existing]
+ )
+ return None
+
return sql.Task(
status=sql.TaskStatus.QUEUED,
task_type=task_type,
diff --git a/migrations/versions/0050_2026.02.17_7406bb29.py
b/migrations/versions/0050_2026.02.17_7406bb29.py
new file mode 100644
index 00000000..0b635c1c
--- /dev/null
+++ b/migrations/versions/0050_2026.02.17_7406bb29.py
@@ -0,0 +1,29 @@
+"""Add inputs hash to task table
+
+Revision ID: 0050_2026.02.17_7406bb29
+Revises: 0049_2026.02.11_5b874ed2
+Create Date: 2026-02-17 14:34:59.166215+00:00
+"""
+
+from collections.abc import Sequence
+
+import sqlalchemy as sa
+from alembic import op
+
+# Revision identifiers, used by Alembic
+revision: str = "0050_2026.02.17_7406bb29"
+down_revision: str | None = "0049_2026.02.11_5b874ed2"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+def upgrade() -> None:
+ with op.batch_alter_table("task", schema=None) as batch_op:
+ batch_op.add_column(sa.Column("inputs_hash", sa.String(),
nullable=True))
+ batch_op.create_index(batch_op.f("ix_task_inputs_hash"),
["inputs_hash"], unique=False)
+
+
+def downgrade() -> None:
+ with op.batch_alter_table("task", schema=None) as batch_op:
+ batch_op.drop_index(batch_op.f("ix_task_inputs_hash"))
+ batch_op.drop_column("inputs_hash")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]