This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 2e99ba3410485f23a5d1886f2f39a758c3a6ac15 Author: Alastair McFarlane <[email protected]> AuthorDate: Fri Jan 16 10:11:50 2026 +0000 Move asf_uid to args model and tweak logic to clear scheduled tasks --- atr/models/sql.py | 2 +- atr/tasks/__init__.py | 5 ++--- atr/tasks/gha.py | 9 +++++---- atr/worker.py | 5 ++--- 4 files changed, 10 insertions(+), 11 deletions(-) diff --git a/atr/models/sql.py b/atr/models/sql.py index 5891a04..444c134 100644 --- a/atr/models/sql.py +++ b/atr/models/sql.py @@ -358,7 +358,7 @@ class Task(sqlmodel.SQLModel, table=True): default_factory=lambda: datetime.datetime.now(datetime.UTC), sa_column=sqlalchemy.Column(UTCDateTime, index=True), ) - scheduled: datetime.datetime = sqlmodel.Field( + scheduled: datetime.datetime | None = sqlmodel.Field( default=None, sa_column=sqlalchemy.Column(UTCDateTime, index=True), ) diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index b575a83..325a7e6 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -64,7 +64,6 @@ async def clear_scheduled(caller_data: db.Session | None = None): """Clear all future scheduled tasks of the given types.""" async with db.ensure_session(caller_data) as data: via = sql.validate_instrumented_attribute - now = datetime.datetime.now(datetime.UTC) delete_stmt = sqlmodel.delete(sql.Task).where( via(sql.Task.task_type).in_( @@ -74,7 +73,7 @@ async def clear_scheduled(caller_data: db.Session | None = None): ] ), via(sql.Task.status) == sql.TaskStatus.QUEUED, - sqlmodel.or_(via(sql.Task.scheduled).is_(None), via(sql.Task.scheduled) > now), + via(sql.Task.scheduled).is_not(None), ) await data.execute(delete_stmt) @@ -302,7 +301,7 @@ async def workflow_update( schedule_next: bool = False, ) -> sql.Task: """Queue a workflow status update task.""" - args = gha.WorkflowStatusCheck(next_schedule_seconds=0, run_id=0) + args = gha.WorkflowStatusCheck(next_schedule_seconds=0, run_id=0, asf_uid=asf_uid) if schedule_next: args.next_schedule_seconds = 2 * 60 async with db.ensure_session(caller_data) as data: diff --git a/atr/tasks/gha.py b/atr/tasks/gha.py index 1e8eda1..0baf302 100644 --- a/atr/tasks/gha.py +++ b/atr/tasks/gha.py @@ -64,6 +64,7 @@ class DistributionWorkflow(schema.Strict): class WorkflowStatusCheck(schema.Strict): run_id: int | None = schema.description("Run ID") next_schedule_seconds: int = pydantic.Field(default=0, description="The next scheduled time") + asf_uid: str = schema.description("ASF UID of the user triggering the workflow") @checks.with_model(DistributionWorkflow) @@ -123,7 +124,7 @@ async def trigger_workflow(args: DistributionWorkflow, *, task_id: int | None = @checks.with_model(WorkflowStatusCheck) -async def status_check(args: WorkflowStatusCheck, asf_uid: str) -> DistributionWorkflowStatus: +async def status_check(args: WorkflowStatusCheck) -> DistributionWorkflowStatus: """Check remote workflow statuses.""" headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {config.get().GITHUB_TOKEN}"} @@ -182,7 +183,7 @@ async def status_check(args: WorkflowStatusCheck, asf_uid: str) -> DistributionW ) # Schedule next update - await _schedule_next(args, asf_uid) + await _schedule_next(args) return results.DistributionWorkflowStatus( kind="distribution_workflow_status", @@ -274,10 +275,10 @@ async def _request_and_retry( return None -async def _schedule_next(args: WorkflowStatusCheck, asf_uid: str) -> None: +async def _schedule_next(args: WorkflowStatusCheck) -> None: if args.next_schedule_seconds: next_schedule = datetime.datetime.now(datetime.UTC) + datetime.timedelta(seconds=args.next_schedule_seconds) - await tasks.workflow_update(asf_uid, schedule=next_schedule, schedule_next=True) + await tasks.workflow_update(args.asf_uid, schedule=next_schedule, schedule_next=True) log.info( f"Scheduled next workflow status update for: {next_schedule.strftime('%Y-%m-%d %H:%M:%S')}", ) diff --git a/atr/worker.py b/atr/worker.py index aa19e28..b1b8f9c 100644 --- a/atr/worker.py +++ b/atr/worker.py @@ -120,7 +120,8 @@ async def _task_next_claim() -> tuple[int, str, list[str] | dict[str, Any], str] sqlmodel.and_( sql.Task.status == task.QUEUED, sqlmodel.or_( - via(sql.Task.scheduled).is_(None), sql.Task.scheduled <= datetime.datetime.now(datetime.UTC) + via(sql.Task.scheduled).is_(None), + via(sql.Task.scheduled) <= datetime.datetime.now(datetime.UTC), ), ) ) @@ -178,8 +179,6 @@ async def _task_process(task_id: int, task_type: str, task_args: list[str] | dic additional_kwargs = {} if sig.parameters.get("task_id") is not None: additional_kwargs["task_id"] = task_id - if sig.parameters.get("asf_uid") is not None: - additional_kwargs["asf_uid"] = asf_uid handler_result = await handler(task_args, **additional_kwargs) task_results = handler_result --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
