This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 46e8fadfcc0c4016f80929e3e0f89da6d333b916 Author: Alastair McFarlane <[email protected]> AuthorDate: Tue Feb 17 16:36:20 2026 +0000 Remove check for task running and add unique constraint, for which we try to catch the IntegrityError. Include in playwright tests and don't use revision number to filter individual check results. --- atr/get/result.py | 1 - atr/models/sql.py | 7 ++- atr/tasks/__init__.py | 68 ++++++++++++++----------- migrations/versions/0051_2026.02.17_12ac0c6b.py | 28 ++++++++++ playwright/test.py | 52 +++++++++++++++++++ 5 files changed, 123 insertions(+), 33 deletions(-) diff --git a/atr/get/result.py b/atr/get/result.py index eaf4142e..06bbf328 100644 --- a/atr/get/result.py +++ b/atr/get/result.py @@ -53,7 +53,6 @@ async def data( check_result = await data.check_result( id=check_id, release_name=release.name, - revision_number=release.latest_revision_number, ).demand(base.ASFQuartException("Check result not found", errorcode=404)) payload = check_result.model_dump(mode="json", exclude={"release"}) diff --git a/atr/models/sql.py b/atr/models/sql.py index c1eda568..9d2c9099 100644 --- a/atr/models/sql.py +++ b/atr/models/sql.py @@ -357,7 +357,12 @@ class Task(sqlmodel.SQLModel, table=True): status: TaskStatus = sqlmodel.Field(default=TaskStatus.QUEUED, index=True) task_type: TaskType task_args: Any = sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON)) - inputs_hash: str | None = sqlmodel.Field(default=None, index=True, **example("blake3:7f83b1657ff1fc...")) + inputs_hash: str | None = sqlmodel.Field( + default=None, + **example("blake3:7f83b1657ff1fc..."), + unique=True, + index=True, + ) asf_uid: str added: datetime.datetime = sqlmodel.Field( default_factory=lambda: datetime.datetime.now(datetime.UTC), diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index 42e03406..60b5df83 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -18,9 +18,11 @@ import asyncio import datetime import pathlib +import sqlite3 from collections.abc import Awaitable, Callable, Coroutine from typing import Any, Final +import sqlalchemy.exc import sqlmodel import atr.attestable as attestable @@ -187,7 +189,7 @@ async def draft_checks( extra_args={"is_podling": is_podling}, ) if path_check_task: - data.add(path_check_task) + await _add_task(data, path_check_task) if caller_data is None: await data.commit() @@ -215,7 +217,7 @@ async def _draft_file_checks( for task in await task_function(asf_uid, release, revision_number, path_str, data): if task: task.revision_number = revision_number - data.add(task) + await _add_task(data, task) # TODO: Should we check .json files for their content? # Ideally we would not have to do that if path.name.endswith(".cdx.json"): @@ -236,7 +238,7 @@ async def _draft_file_checks( }, ) if cdx_task: - data.add(cdx_task) + await _add_task(data, cdx_task) async def keys_import_file( @@ -298,39 +300,43 @@ async def queued( extra_args: dict[str, Any] | None = None, check_cache_key: dict[str, Any] | None = None, ) -> sql.Task | None: - # If there's a queued or running task for this same set of inputs and hash value, don't start a new one - # If there isn't one, but there is an existing check result, also don't run a new task, just use the existing one + hash_val = None if check_cache_key is not None: hash_val = hashes.compute_dict_hash(check_cache_key) if not data: raise RuntimeError("DB Session is required for check_cache_key") - existing_task = await data.task( - inputs_hash=hash_val, - project_name=release.project_name, - version_name=release.version, - task_args=extra_args or {}, - status_in=[sql.TaskStatus.QUEUED, sql.TaskStatus.ACTIVE], - ).all() - if existing_task: + existing = await data.check_result(inputs_hash=hash_val, release_name=release.name).all() + if existing: + await attestable.write_checks_data( + release.project.name, release.version, revision_number, [c.id for c in existing] + ) return None - else: - existing = await data.check_result(inputs_hash=hash_val, release_name=release.name).all() - if existing: - await attestable.write_checks_data( - release.project.name, release.version, revision_number, [c.id for c in existing] - ) - return None - - return sql.Task( - status=sql.TaskStatus.QUEUED, - task_type=task_type, - task_args=extra_args or {}, - asf_uid=asf_uid, - project_name=release.project.name, - version_name=release.version, - revision_number=revision_number, - primary_rel_path=primary_rel_path, - ) + return sql.Task( + status=sql.TaskStatus.QUEUED, + task_type=task_type, + task_args=extra_args or {}, + asf_uid=asf_uid, + project_name=release.project.name, + version_name=release.version, + revision_number=revision_number, + primary_rel_path=primary_rel_path, + inputs_hash=hash_val, + ) + + +async def _add_task(data: db.Session, task: sql.Task) -> None: + try: + async with data.begin_nested(): + data.add(task) + await data.flush() + except sqlalchemy.exc.IntegrityError as e: + if ( + isinstance(e.orig, sqlite3.IntegrityError) + and (e.orig.sqlite_errorcode == sqlite3.SQLITE_CONSTRAINT_UNIQUE) + and ("task.inputs_hash" in str(e.orig)) + ): + return + raise def resolve(task_type: sql.TaskType) -> Callable[..., Awaitable[results.Results | None]]: # noqa: C901 diff --git a/migrations/versions/0051_2026.02.17_12ac0c6b.py b/migrations/versions/0051_2026.02.17_12ac0c6b.py new file mode 100644 index 00000000..63bde01a --- /dev/null +++ b/migrations/versions/0051_2026.02.17_12ac0c6b.py @@ -0,0 +1,28 @@ +"""Unique index + +Revision ID: 0051_2026.02.17_12ac0c6b +Revises: 0050_2026.02.17_7406bb29 +Create Date: 2026-02-17 16:46:37.248657+00:00 +""" + +from collections.abc import Sequence + +from alembic import op + +# Revision identifiers, used by Alembic +revision: str = "0051_2026.02.17_12ac0c6b" +down_revision: str | None = "0050_2026.02.17_7406bb29" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + with op.batch_alter_table("task", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_task_inputs_hash")) + batch_op.create_index(batch_op.f("ix_task_inputs_hash"), ["inputs_hash"], unique=True) + + +def downgrade() -> None: + with op.batch_alter_table("task", schema=None) as batch_op: + batch_op.drop_index(batch_op.f("ix_task_inputs_hash")) + batch_op.create_index(batch_op.f("ix_task_inputs_hash"), ["inputs_hash"], unique=False) diff --git a/playwright/test.py b/playwright/test.py index 1e55c7b7..a82a37ae 100755 --- a/playwright/test.py +++ b/playwright/test.py @@ -20,6 +20,7 @@ import argparse import dataclasses import glob +import json import logging import os import re @@ -554,6 +555,7 @@ def test_all(page: Page, credentials: Credentials, skip_slow: bool) -> None: test_checks_04_paths, test_checks_05_signature, test_checks_06_targz, + test_checks_07_cache, ] # Order between our tests must be preserved @@ -748,6 +750,56 @@ def test_checks_06_targz(page: Page, credentials: Credentials) -> None: logging.info("Targz Structure status verified as Success") +def test_checks_07_cache(page: Page, credentials: Credentials) -> None: + project_name = TEST_PROJECT + version_name = "0.2" + filename_targz = f"apache-{project_name}-{version_name}.tar.gz" + report_file_path = f"/report/{project_name}/{version_name}/{filename_targz}" + + logging.info(f"Starting check cache checks for {filename_targz}") + logging.info("Uploading new file to create new revision") + logging.info(f"Navigating to the upload file page for {TEST_PROJECT} {version_name}") + go_to_path(page, f"/upload/{TEST_PROJECT}/{version_name}") + logging.info("Upload file page loaded") + + logging.info("Locating the file input") + file_input_locator = page.locator('input[name="file_data"]') + expect(file_input_locator).to_be_visible() + + logging.info("Setting the input file to /run/tests/example.txt") + file_input_locator.set_input_files("/run/tests/example.txt") + + logging.info("Locating and activating the add files button") + submit_button_locator = page.get_by_role("button", name="Add files") + expect(submit_button_locator).to_be_enabled() + submit_button_locator.click() + + logging.info("Waiting for upload to complete and redirect to compose page") + page.wait_for_url(f"**/compose/{TEST_PROJECT}/{version_name}*", timeout=30000) + wait_for_path(page, f"/compose/{TEST_PROJECT}/{version_name}") + logging.info("Add file actions completed successfully") + + logging.info(f"Navigating to report page {report_file_path}") + go_to_path(page, report_file_path) + logging.info(f"Successfully navigated to {report_file_path}") + + ensure_success_results_are_visible(page, "primary") + + logging.info("Verifying Targz Integrity status exists") + integrity_row_locator = page.locator("tr.atr-result-primary:has(th:has-text('Targz Integrity'))") + expect(integrity_row_locator).to_be_visible() + + logging.info("Verifying Targz Integrity result is from previous revision") + check_link_locator = integrity_row_locator.locator("th:has-text('Targz Integrity') a") + check_link_url = (check_link_locator.get_attribute("href") or "").replace(ATR_BASE_URL, "") + check_link_locator.click() + logging.info(f"Waiting for raw check result to load: {check_link_url}") + wait_for_path(page, check_link_url) + check_result = json.loads(page.locator("pre").inner_text()) + logging.info("Verifying revision number") + assert check_result["revision_number"] == "00001" + + def test_openpgp_01_upload(page: Page, credentials: Credentials) -> None: for key_path in glob.glob("/run/tests/*.asc"): key_fingerprint_lower = os.path.basename(key_path).split(".")[0].lower() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
