This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch arm in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 749818748646717afcb8d881e3673d163e044cca Author: Alastair McFarlane <[email protected]> AuthorDate: Tue Feb 17 17:42:44 2026 +0000 Remove check for task running and add unique constraint, which we try to catch the IntegrityError for --- atr/models/sql.py | 7 ++- atr/tasks/__init__.py | 57 ++++++++++++++++--------- migrations/versions/0051_2026.02.17_12ac0c6b.py | 28 ++++++++++++ 3 files changed, 72 insertions(+), 20 deletions(-) diff --git a/atr/models/sql.py b/atr/models/sql.py index c1eda568..9d2c9099 100644 --- a/atr/models/sql.py +++ b/atr/models/sql.py @@ -357,7 +357,12 @@ class Task(sqlmodel.SQLModel, table=True): status: TaskStatus = sqlmodel.Field(default=TaskStatus.QUEUED, index=True) task_type: TaskType task_args: Any = sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON)) - inputs_hash: str | None = sqlmodel.Field(default=None, index=True, **example("blake3:7f83b1657ff1fc...")) + inputs_hash: str | None = sqlmodel.Field( + default=None, + **example("blake3:7f83b1657ff1fc..."), + unique=True, + index=True, + ) asf_uid: str added: datetime.datetime = sqlmodel.Field( default_factory=lambda: datetime.datetime.now(datetime.UTC), diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index 3d16d841..7752f4f2 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -18,9 +18,11 @@ import asyncio import datetime import pathlib +import sqlite3 from collections.abc import Awaitable, Callable, Coroutine from typing import Any, Final +import sqlalchemy import sqlmodel import atr.attestable as attestable @@ -187,7 +189,7 @@ async def draft_checks( extra_args={"is_podling": is_podling}, ) if path_check_task: - data.add(path_check_task) + await _add_tasks(data, path_check_task) if caller_data is None: await data.commit() @@ -215,7 +217,7 @@ async def _draft_file_checks( for task in await task_function(asf_uid, release, revision_number, path_str, data): if task: task.revision_number = revision_number - data.add(task) + await _add_tasks(data, task) # TODO: Should we check .json files for their content? # Ideally we would not have to do that if path.name.endswith(".cdx.json"): @@ -236,7 +238,7 @@ async def _draft_file_checks( }, ) if cdx_task: - data.add(cdx_task) + await _add_tasks(data, cdx_task) async def keys_import_file( @@ -298,26 +300,43 @@ async def queued( extra_args: dict[str, Any] | None = None, check_cache_key: dict[str, Any] | None = None, ) -> sql.Task | None: + hash_val = None if check_cache_key is not None: hash_val = hashes.compute_dict_hash(check_cache_key) if not data: raise RuntimeError("DB Session is required for check_cache_key") - existing = await data.check_result(inputs_hash=hash_val, release_name=release.name).all() - if existing: - await attestable.write_checks_data( - release.project.name, release.version, revision_number, [c.id for c in existing] - ) - return None - return sql.Task( - status=sql.TaskStatus.QUEUED, - task_type=task_type, - task_args=extra_args or {}, - asf_uid=asf_uid, - project_name=release.project.name, - version_name=release.version, - revision_number=revision_number, - primary_rel_path=primary_rel_path, - ) + # existing = await data.check_result(inputs_hash=hash_val, release_name=release.name).all() + # if existing: + # await attestable.write_checks_data( + # release.project.name, release.version, revision_number, [c.id for c in existing] + # ) + # return None + return sql.Task( + status=sql.TaskStatus.QUEUED, + task_type=task_type, + task_args=extra_args or {}, + asf_uid=asf_uid, + project_name=release.project.name, + version_name=release.version, + revision_number=revision_number, + primary_rel_path=primary_rel_path, + inputs_hash=hash_val, + ) + + +async def _add_tasks(data: db.Session, *tasks: sql.Task) -> None: + for task in tasks: + try: + await data.flush() + data.add(task) + await data.flush() + # Catch failing unique constraint + except sqlalchemy.exc.IntegrityError as e: + if e.orig.sqlite_errorcode == 2067 and "task.inputs_hash" in e.orig.args[0]: + return None + else: + raise e + return None def resolve(task_type: sql.TaskType) -> Callable[..., Awaitable[results.Results | None]]: # noqa: C901 diff --git a/migrations/versions/0051_2026.02.17_12ac0c6b.py b/migrations/versions/0051_2026.02.17_12ac0c6b.py new file mode 100644 index 00000000..63bde01a --- /dev/null +++ b/migrations/versions/0051_2026.02.17_12ac0c6b.py @@ -0,0 +1,28 @@ +"""Unique index + +Revision ID: 0051_2026.02.17_12ac0c6b +Revises: 0050_2026.02.17_7406bb29 +Create Date: 2026-02-17 16:46:37.248657+00:00 +""" + +from collections.abc import Sequence + +from alembic import op + +# Revision identifiers, used by Alembic +revision: str = "0051_2026.02.17_12ac0c6b" +down_revision: str | None = "0050_2026.02.17_7406bb29" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + with op.batch_alter_table("task", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_task_inputs_hash")) + batch_op.create_index(batch_op.f("ix_task_inputs_hash"), ["inputs_hash"], unique=True) + + +def downgrade() -> None: + with op.batch_alter_table("task", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_task_inputs_hash")) + batch_op.create_index(batch_op.f("ix_task_inputs_hash"), ["inputs_hash"], unique=False) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
