This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch pending_dist_changes in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 8ed69eb50c6941792f66447bf63cdebcfdff9a3e Author: Alastair McFarlane <[email protected]> AuthorDate: Thu Jan 29 15:04:32 2026 +0000 #216 - Scheduled task for pending distributions, add created_by to dist table. --- atr/get/distribution.py | 6 ++++- atr/models/sql.py | 1 + atr/server.py | 3 +++ atr/storage/writers/distributions.py | 2 ++ atr/tasks/__init__.py | 1 + atr/tasks/distribution.py | 28 +++++++++++++++------ migrations/versions/0043_2026.01.29_d7d89670.py | 33 +++++++++++++++++++++++++ 7 files changed, 65 insertions(+), 9 deletions(-) diff --git a/atr/get/distribution.py b/atr/get/distribution.py index 7bc9162..4bbf922 100644 --- a/atr/get/distribution.py +++ b/atr/get/distribution.py @@ -17,6 +17,7 @@ from collections.abc import Sequence import asfquart.base as base +import htpy import atr.blueprints.get as get import atr.db as db @@ -77,12 +78,15 @@ async def list_get(session: web.Committer, project_name: str, version_name: str) ## Distributions block.h2["Distributions"] for dist in distributions: + title_extra = [] + if dist.pending: + title_extra.append(htpy.small(".text-muted")[" (this distribution is being verified by ATR)"]) ### Platform package version block.h3( # Cannot use "#id" here, because the ID contains "." # If an ID contains ".", htm parses that as a class id=f"distribution-{dist.identifier}" - )[dist.title] + )[dist.title, *title_extra] tbody = htm.tbody[ shared.distribution.html_tr("Release name", dist.release_name), shared.distribution.html_tr("Platform", dist.platform.value.name), diff --git a/atr/models/sql.py b/atr/models/sql.py index e830083..0b8d8a4 100644 --- a/atr/models/sql.py +++ b/atr/models/sql.py @@ -976,6 +976,7 @@ class Distribution(sqlmodel.SQLModel, table=True): upload_date: datetime.datetime | None = sqlmodel.Field(default=None) api_url: str | None = sqlmodel.Field(default=None) web_url: str | None = sqlmodel.Field(default=None) + created_by: str | None = sqlmodel.Field(default=None) # The API response can be huge, e.g. from npm # So we do not store it in the database # api_response: Any = sqlmodel.Field(sa_column=sqlalchemy.Column(sqlalchemy.JSON)) diff --git a/atr/server.py b/atr/server.py index 1ef0d60..70438d1 100644 --- a/atr/server.py +++ b/atr/server.py @@ -802,6 +802,9 @@ async def _register_recurrent_tasks() -> None: await asyncio.sleep(60) workflow = await tasks.workflow_update(asf_uid="system", schedule_next=True) log.info(f"Scheduled workflow status update with ID {workflow.id}") + await asyncio.sleep(60) + dist_check = await tasks.distribution_status_check(asf_uid="system", schedule_next=True) + log.info(f"Scheduled distribution status update with ID {dist_check.id}") except Exception as e: log.exception(f"Failed to schedule recurrent tasks: {e!s}") diff --git a/atr/storage/writers/distributions.py b/atr/storage/writers/distributions.py index 312a471..7a4b683 100644 --- a/atr/storage/writers/distributions.py +++ b/atr/storage/writers/distributions.py @@ -160,6 +160,7 @@ class CommitteeMember(CommitteeParticipant): upload_date=upload_date, api_url=api_url, web_url=web_url, + created_by=self.__asf_uid, ) if existing is None: self.__data.add(dist) @@ -271,6 +272,7 @@ class CommitteeMember(CommitteeParticipant): existing.upload_date = upload_date existing.api_url = api_url existing.web_url = web_url + existing.created_by = self.__asf_uid await self.__data.commit() return existing return None diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index ea613bb..8ba7fd3 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -71,6 +71,7 @@ async def clear_scheduled(caller_data: db.Session | None = None) -> None: [ sql.TaskType.METADATA_UPDATE, sql.TaskType.WORKFLOW_STATUS, + sql.TaskType.DISTRIBUTION_STATUS, ] ), via(sql.Task.status) == sql.TaskStatus.QUEUED, diff --git a/atr/tasks/distribution.py b/atr/tasks/distribution.py index 541f918..36caabf 100644 --- a/atr/tasks/distribution.py +++ b/atr/tasks/distribution.py @@ -39,10 +39,12 @@ class DistributionStatusCheckArgs(schema.Strict): @checks.with_model(DistributionStatusCheckArgs) async def status_check(args: DistributionStatusCheckArgs, *, task_id: int | None = None) -> results.Results | None: + log.info("Checking pending recorded distributions") dists = [] async with db.session() as data: dists = await data.distribution(pending=True, _with_release=True, _with_release_project=True).all() for dist in dists: + name = f"{dist.platform} {dist.owner_namespace} {dist.package} {dist.version}" dd = distribution.Data( platform=dist.platform, owner_namespace=dist.owner_namespace, @@ -50,30 +52,40 @@ async def status_check(args: DistributionStatusCheckArgs, *, task_id: int | None version=dist.version, details=False, ) + if not dist.created_by: + log.warning(f"Distribution {name} has no creator, skipping") + continue + if not dist.release.project.committee_name: + log.warning(f"Distribution {name} has no committee, skipping") + continue try: - async with storage.write_as_committee_member(str(dist.release.project.committee_name), args.asf_uid) as w: + async with storage.write_as_committee_member(dist.release.project.committee_name, dist.created_by) as w: if dist.retries >= _RETRY_LIMIT: await w.distributions.delete_distribution( dist.release_name, dist.platform, dist.owner_namespace, dist.package, dist.version ) - name = f"{dist.platform} {dist.owner_namespace} {dist.package} {dist.version}" log.error(f"Distribution {name} failed {_RETRY_LIMIT} times, skipping") continue + log.warning(f"Retrying distribution {name}") await w.distributions.record_from_data( dist.release_name, dist.staging, dd, + allow_retries=True, ) except storage.AccessError as e: msg = f"Failed to record distribution: {e}" log.error(msg) raise RuntimeError(msg) - if args.next_schedule_seconds: - next_schedule = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=args.next_schedule_seconds) - await tasks.distribution_status_check(args.asf_uid, schedule=next_schedule, schedule_next=True) - log.info( - f"Scheduled next workflow status update for: {next_schedule.strftime('%Y-%m-%d %H:%M:%S')}", - ) + finally: + if args.next_schedule_seconds: + next_schedule = datetime.datetime.now(datetime.UTC) + datetime.timedelta( + seconds=args.next_schedule_seconds + ) + await tasks.distribution_status_check(args.asf_uid, schedule=next_schedule, schedule_next=True) + log.info( + f"Scheduled next workflow status update for: {next_schedule.strftime('%Y-%m-%d %H:%M:%S')}", + ) return results.DistributionStatusCheck( kind="distribution_status", ) diff --git a/migrations/versions/0043_2026.01.29_d7d89670.py b/migrations/versions/0043_2026.01.29_d7d89670.py new file mode 100644 index 0000000..8342377 --- /dev/null +++ b/migrations/versions/0043_2026.01.29_d7d89670.py @@ -0,0 +1,33 @@ +"""column for distribution creator + +Revision ID: 0043_2026.01.29_d7d89670 +Revises: 0042_2026.01.28_3e434625 +Create Date: 2026-01-29 13:56:34.371692+00:00 +""" + +from collections.abc import Sequence + +import sqlalchemy as sa +from alembic import op + +# Revision identifiers, used by Alembic +revision: str = "0043_2026.01.29_d7d89670" +down_revision: str | None = "0042_2026.01.28_3e434625" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("distribution", schema=None) as batch_op: + batch_op.add_column(sa.Column("created_by", sa.String(), nullable=True)) + + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("distribution", schema=None) as batch_op: + batch_op.drop_column("created_by") + + # ### end Alembic commands ### --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
