This is an automated email from the ASF dual-hosted git repository. arm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
commit 53be97cef533a11445bd4ec902174937d9174616 Author: Alastair McFarlane <[email protected]> AuthorDate: Wed Jan 7 11:09:13 2026 +0000 Add task which can trigger a specified github action, provided ATR has a valid token for that repository. --- atr/config.py | 1 + atr/models/results.py | 12 +++- atr/models/sql.py | 1 + atr/tasks/__init__.py | 3 + atr/tasks/gha.py | 160 ++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 176 insertions(+), 1 deletion(-) diff --git a/atr/config.py b/atr/config.py index 77d7d32..41195a9 100644 --- a/atr/config.py +++ b/atr/config.py @@ -57,6 +57,7 @@ class AppConfig: PUBSUB_USER = _config_secrets("PUBSUB_USER", STATE_DIR, default=None, cast=str) PUBSUB_PASSWORD = _config_secrets("PUBSUB_PASSWORD", STATE_DIR, default=None, cast=str) SVN_TOKEN = _config_secrets("SVN_TOKEN", STATE_DIR, default=None, cast=str) + GITHUB_TOKEN = _config_secrets("GITHUB_TOKEN", STATE_DIR, default=None, cast=str) DEBUG = False TEMPLATES_AUTO_RELOAD = False diff --git a/atr/models/results.py b/atr/models/results.py index 5d3ffbc..713291d 100644 --- a/atr/models/results.py +++ b/atr/models/results.py @@ -24,6 +24,15 @@ import atr.sbom.models.osv as osv from . import schema +class GithubActionsWorkflow(schema.Strict): + """Result of the task to run a Github workflow.""" + + kind: Literal["github_actions_workflow"] = schema.Field(alias="kind") + name: str = schema.description("The name of the action being performed") + run_id: int = schema.description("The ID of the workflow run") + url: str = schema.description("The URL of the workflow run") + + class HashingCheck(schema.Strict): """Result of the task to check the hash of a file.""" @@ -184,7 +193,8 @@ class MetadataUpdate(schema.Strict): Results = Annotated[ - HashingCheck + GithubActionsWorkflow + | HashingCheck | MessageSend | MetadataUpdate | SBOMAugment diff --git a/atr/models/sql.py b/atr/models/sql.py index 659468f..02b502f 100644 --- a/atr/models/sql.py +++ b/atr/models/sql.py @@ -179,6 +179,7 @@ class TaskStatus(str, enum.Enum): class TaskType(str, enum.Enum): + GITHUB_ACTION_WORKFLOW = "github_action_workflow" HASHING_CHECK = "hashing_check" KEYS_IMPORT_FILE = "keys_import_file" LICENSE_FILES = "license_files" diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py index 963f765..dde92ed 100644 --- a/atr/tasks/__init__.py +++ b/atr/tasks/__init__.py @@ -28,6 +28,7 @@ import atr.tasks.checks.rat as rat import atr.tasks.checks.signature as signature import atr.tasks.checks.targz as targz import atr.tasks.checks.zipformat as zipformat +import atr.tasks.gha as gha import atr.tasks.keys as keys import atr.tasks.message as message import atr.tasks.metadata as metadata @@ -188,6 +189,8 @@ def queued( def resolve(task_type: sql.TaskType) -> Callable[..., Awaitable[results.Results | None]]: # noqa: C901 match task_type: + case sql.TaskType.GITHUB_ACTION_WORKFLOW: + return gha.trigger_workflow case sql.TaskType.HASHING_CHECK: return hashing.check case sql.TaskType.KEYS_IMPORT_FILE: diff --git a/atr/tasks/gha.py b/atr/tasks/gha.py new file mode 100644 index 0000000..3c811da --- /dev/null +++ b/atr/tasks/gha.py @@ -0,0 +1,160 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import asyncio +import json +import uuid +from collections.abc import Callable +from typing import Any, Final, NoReturn + +import aiohttp + +import atr.config as config +import atr.log as log +import atr.models.results as results +import atr.models.schema as schema +import atr.tasks.checks as checks + +_BASE_URL: Final[str] = "https://api.github.com/repos" +_IN_PROGRESS_STATUSES: Final[list[str]] = ["in_progress", "queued", "requested", "waiting", "pending", "expected"] +_COMPLETED_STATUSES: Final[list[str]] = ["completed"] +_FAILED_STATUSES: Final[list[str]] = ["failure", "startup_failure"] +_TIMEOUT_S = 5 + + +class GithubActionsWorkflow(schema.Strict): + """Arguments for the task to start a Github Actions workflow.""" + + owner: str = schema.description("Github owner of the repository") + repo: str = schema.description("Repository in which to start the workflow") + workflow_id: str = schema.description("Workflow ID") + ref: str = schema.description("Git ref to trigger the workflow") + arguments: dict[str, str] = schema.description("Workflow arguments") + name: str = schema.description("Name of the run") + + [email protected]_model(GithubActionsWorkflow) +async def trigger_workflow(args: GithubActionsWorkflow) -> results.Results | None: + unique_id = f"{args.name}-{uuid.uuid4()}" + payload = {"ref": args.ref, "inputs": {"atr-id": unique_id, **args.arguments}} + headers = {"Accept": "application/vnd.github+json", "Authorization": f"Bearer {config.get().GITHUB_TOKEN}"} + log.info( + f"Triggering Github workflow {args.owner}/{args.repo}/{args.workflow_id} with args: { + json.dumps(args.arguments, indent=2) + }" + ) + async with aiohttp.ClientSession() as session: + try: + async with session.post( + f"{_BASE_URL}/{args.owner}/{args.repo}/actions/workflows/{args.workflow_id}/dispatches", + headers=headers, + json=payload, + ) as response: + response.raise_for_status() + except aiohttp.ClientResponseError as e: + _fail(f"Failed to trigger workflow run: {e.message} ({e.status})") + + run, run_id = await _find_triggered_run(session, args, headers, unique_id) + + if run.get("status") in _IN_PROGRESS_STATUSES: + run = await _wait_for_completion(session, args, headers, run_id, unique_id) + + if run.get("status") in _FAILED_STATUSES: + _fail(f"Github workflow {args.owner}/{args.repo}/{args.workflow_id} run {run_id} failed with error") + if run.get("status") in _COMPLETED_STATUSES: + log.info(f"Workflow {args.owner}/{args.repo}/{args.workflow_id} run {run_id} completed successfully") + return results.GithubActionsWorkflow( + kind="github_actions_workflow", name=args.name, run_id=run_id, url=run.get("html_url", "") + ) + _fail(f"Timed out waiting for workflow {args.owner}/{args.repo}/{args.workflow_id}") + + +def _fail(message: str) -> NoReturn: + log.error(message) + raise RuntimeError(message) + + +async def _find_triggered_run( + session: aiohttp.ClientSession, + args: GithubActionsWorkflow, + headers: dict[str, str], + unique_id: str, +) -> tuple[dict[str, Any], int]: + """Find the workflow run that was just triggered.""" + + def get_run(resp: dict[str, Any]) -> dict[str, Any] | None: + return next( + (r for r in resp["workflow_runs"] if r["head_branch"] == args.ref and r["name"] == unique_id), + None, + ) + + run = await _request_and_retry( + session, f"{_BASE_URL}/{args.owner}/{args.repo}/actions/runs?event=workflow_dispatch", headers, get_run + ) + if run is None: + _fail(f"Failed to find triggered workflow run for {unique_id}") + run_id: int | None = run.get("id") + if run_id is None: + _fail(f"Found run for {unique_id} but run ID is missing") + return run, run_id + + +async def _request_and_retry( + session: aiohttp.client.ClientSession, + url: str, + headers: dict[str, str], + response_func: Callable[[Any], dict[str, Any] | None], +) -> dict[str, Any] | None: + for _attempt in range(_TIMEOUT_S * 10): # timeout_s * 10): + async with session.get( + url, + headers=headers, + ) as response: + try: + response.raise_for_status() + runs = await response.json() + data = response_func(runs) + if not data: + await asyncio.sleep(0.1) + else: + return data + except aiohttp.ClientResponseError as e: + # We don't raise here as it could be an emphemeral error - if it continues it will return None + log.error(f"Failure calling Github: {e.message} ({e.status}, attempt {_attempt + 1})") + await asyncio.sleep(0.1) + return None + + +async def _wait_for_completion( + session: aiohttp.ClientSession, + args: GithubActionsWorkflow, + headers: dict[str, str], + run_id: int, + unique_id: str, +) -> dict[str, Any]: + """Wait for a workflow run to complete.""" + + def filter_run(resp: dict[str, Any]) -> dict[str, Any] | None: + if resp.get("status") not in _IN_PROGRESS_STATUSES: + return resp + return None + + run = await _request_and_retry( + session, f"{_BASE_URL}/{args.owner}/{args.repo}/actions/runs/{run_id}", headers, filter_run + ) + if run is None: + _fail(f"Failed to find triggered workflow run for {unique_id}") + return run --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
