This is an automated email from the ASF dual-hosted git repository.

arm pushed a commit to branch arm
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git

commit 749818748646717afcb8d881e3673d163e044cca
Author: Alastair McFarlane <[email protected]>
AuthorDate: Tue Feb 17 17:42:44 2026 +0000

    Remove check for task running and add unique constraint, which we try to 
catch the IntegrityError for
---
 atr/models/sql.py                               |  7 ++-
 atr/tasks/__init__.py                           | 57 ++++++++++++++++---------
 migrations/versions/0051_2026.02.17_12ac0c6b.py | 28 ++++++++++++
 3 files changed, 72 insertions(+), 20 deletions(-)

diff --git a/atr/models/sql.py b/atr/models/sql.py
index c1eda568..9d2c9099 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -357,7 +357,12 @@ class Task(sqlmodel.SQLModel, table=True):
     status: TaskStatus = sqlmodel.Field(default=TaskStatus.QUEUED, index=True)
     task_type: TaskType
     task_args: Any = 
sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON))
-    inputs_hash: str | None = sqlmodel.Field(default=None, index=True, 
**example("blake3:7f83b1657ff1fc..."))
+    inputs_hash: str | None = sqlmodel.Field(
+        default=None,
+        **example("blake3:7f83b1657ff1fc..."),
+        unique=True,
+        index=True,
+    )
     asf_uid: str
     added: datetime.datetime = sqlmodel.Field(
         default_factory=lambda: datetime.datetime.now(datetime.UTC),
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 3d16d841..7752f4f2 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -18,9 +18,11 @@
 import asyncio
 import datetime
 import pathlib
+import sqlite3
 from collections.abc import Awaitable, Callable, Coroutine
 from typing import Any, Final
 
+import sqlalchemy
 import sqlmodel
 
 import atr.attestable as attestable
@@ -187,7 +189,7 @@ async def draft_checks(
             extra_args={"is_podling": is_podling},
         )
         if path_check_task:
-            data.add(path_check_task)
+            await _add_tasks(data, path_check_task)
             if caller_data is None:
                 await data.commit()
 
@@ -215,7 +217,7 @@ async def _draft_file_checks(
         for task in await task_function(asf_uid, release, revision_number, 
path_str, data):
             if task:
                 task.revision_number = revision_number
-                data.add(task)
+                await _add_tasks(data, task)
     # TODO: Should we check .json files for their content?
     # Ideally we would not have to do that
     if path.name.endswith(".cdx.json"):
@@ -236,7 +238,7 @@ async def _draft_file_checks(
             },
         )
         if cdx_task:
-            data.add(cdx_task)
+            await _add_tasks(data, cdx_task)
 
 
 async def keys_import_file(
@@ -298,26 +300,43 @@ async def queued(
     extra_args: dict[str, Any] | None = None,
     check_cache_key: dict[str, Any] | None = None,
 ) -> sql.Task | None:
+    hash_val = None
     if check_cache_key is not None:
         hash_val = hashes.compute_dict_hash(check_cache_key)
         if not data:
             raise RuntimeError("DB Session is required for check_cache_key")
-        existing = await data.check_result(inputs_hash=hash_val, 
release_name=release.name).all()
-        if existing:
-            await attestable.write_checks_data(
-                release.project.name, release.version, revision_number, [c.id 
for c in existing]
-            )
-            return None
-    return sql.Task(
-        status=sql.TaskStatus.QUEUED,
-        task_type=task_type,
-        task_args=extra_args or {},
-        asf_uid=asf_uid,
-        project_name=release.project.name,
-        version_name=release.version,
-        revision_number=revision_number,
-        primary_rel_path=primary_rel_path,
-    )
+        # existing = await data.check_result(inputs_hash=hash_val, 
release_name=release.name).all()
+        # if existing:
+        #     await attestable.write_checks_data(
+        #         release.project.name, release.version, revision_number, 
[c.id for c in existing]
+        #     )
+        #     return None
+        return sql.Task(
+            status=sql.TaskStatus.QUEUED,
+            task_type=task_type,
+            task_args=extra_args or {},
+            asf_uid=asf_uid,
+            project_name=release.project.name,
+            version_name=release.version,
+            revision_number=revision_number,
+            primary_rel_path=primary_rel_path,
+            inputs_hash=hash_val,
+        )
+
+
+async def _add_tasks(data: db.Session, *tasks: sql.Task) -> None:
+    for task in tasks:
+        try:
+            await data.flush()
+            data.add(task)
+            await data.flush()
+        # Catch failing unique constraint
+        except sqlalchemy.exc.IntegrityError as e:
+            if e.orig.sqlite_errorcode == 2067 and "task.inputs_hash" in 
e.orig.args[0]:
+                return None
+            else:
+                raise e
+    return None
 
 
 def resolve(task_type: sql.TaskType) -> Callable[..., 
Awaitable[results.Results | None]]:  # noqa: C901
diff --git a/migrations/versions/0051_2026.02.17_12ac0c6b.py 
b/migrations/versions/0051_2026.02.17_12ac0c6b.py
new file mode 100644
index 00000000..63bde01a
--- /dev/null
+++ b/migrations/versions/0051_2026.02.17_12ac0c6b.py
@@ -0,0 +1,28 @@
+"""Unique index
+
+Revision ID: 0051_2026.02.17_12ac0c6b
+Revises: 0050_2026.02.17_7406bb29
+Create Date: 2026-02-17 16:46:37.248657+00:00
+"""
+
+from collections.abc import Sequence
+
+from alembic import op
+
+# Revision identifiers, used by Alembic
+revision: str = "0051_2026.02.17_12ac0c6b"
+down_revision: str | None = "0050_2026.02.17_7406bb29"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+def upgrade() -> None:
+    with op.batch_alter_table("task", schema=None) as batch_op:
+        batch_op.drop_index(batch_op.f("ix_task_inputs_hash"))
+        batch_op.create_index(batch_op.f("ix_task_inputs_hash"), 
["inputs_hash"], unique=True)
+
+
+def downgrade() -> None:
+    with op.batch_alter_table("task", schema=None) as batch_op:
+        batch_op.drop_index(batch_op.f("ix_task_inputs_hash"))
+        batch_op.create_index(batch_op.f("ix_task_inputs_hash"), 
["inputs_hash"], unique=False)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to