This is an automated email from the ASF dual-hosted git repository.
arm pushed a commit to branch gha-connectivity
in repository https://gitbox.apache.org/repos/asf/tooling-trusted-releases.git
The following commit(s) were added to refs/heads/gha-connectivity by this push:
new d99a74f Add task which can trigger a specified github action,
provided ATR has a valid token for that repository.
d99a74f is described below
commit d99a74fd98f025183a06a6d945528cd348b6c1e0
Author: Alastair McFarlane <[email protected]>
AuthorDate: Wed Jan 7 11:09:13 2026 +0000
Add task which can trigger a specified github action, provided ATR has a
valid token for that repository.
---
atr/config.py | 1 +
atr/models/results.py | 12 +-
atr/models/sql.py | 1 +
atr/tasks/__init__.py | 3 +
atr/tasks/gha.py | 160 ++++++++++++++++++++++++
migrations/versions/0035_2026.01.07_7a420374.py | 72 +++++++++++
6 files changed, 248 insertions(+), 1 deletion(-)
diff --git a/atr/config.py b/atr/config.py
index 77d7d32..41195a9 100644
--- a/atr/config.py
+++ b/atr/config.py
@@ -57,6 +57,7 @@ class AppConfig:
PUBSUB_USER = _config_secrets("PUBSUB_USER", STATE_DIR, default=None,
cast=str)
PUBSUB_PASSWORD = _config_secrets("PUBSUB_PASSWORD", STATE_DIR,
default=None, cast=str)
SVN_TOKEN = _config_secrets("SVN_TOKEN", STATE_DIR, default=None, cast=str)
+ GITHUB_TOKEN = _config_secrets("GITHUB_TOKEN", STATE_DIR, default=None,
cast=str)
DEBUG = False
TEMPLATES_AUTO_RELOAD = False
diff --git a/atr/models/results.py b/atr/models/results.py
index 5d3ffbc..713291d 100644
--- a/atr/models/results.py
+++ b/atr/models/results.py
@@ -24,6 +24,15 @@ import atr.sbom.models.osv as osv
from . import schema
+class GithubActionsWorkflow(schema.Strict):
+ """Result of the task to run a Github workflow."""
+
+ kind: Literal["github_actions_workflow"] = schema.Field(alias="kind")
+ name: str = schema.description("The name of the action being performed")
+ run_id: int = schema.description("The ID of the workflow run")
+ url: str = schema.description("The URL of the workflow run")
+
+
class HashingCheck(schema.Strict):
"""Result of the task to check the hash of a file."""
@@ -184,7 +193,8 @@ class MetadataUpdate(schema.Strict):
Results = Annotated[
- HashingCheck
+ GithubActionsWorkflow
+ | HashingCheck
| MessageSend
| MetadataUpdate
| SBOMAugment
diff --git a/atr/models/sql.py b/atr/models/sql.py
index 659468f..02b502f 100644
--- a/atr/models/sql.py
+++ b/atr/models/sql.py
@@ -179,6 +179,7 @@ class TaskStatus(str, enum.Enum):
class TaskType(str, enum.Enum):
+ GITHUB_ACTION_WORKFLOW = "github_action_workflow"
HASHING_CHECK = "hashing_check"
KEYS_IMPORT_FILE = "keys_import_file"
LICENSE_FILES = "license_files"
diff --git a/atr/tasks/__init__.py b/atr/tasks/__init__.py
index 963f765..dde92ed 100644
--- a/atr/tasks/__init__.py
+++ b/atr/tasks/__init__.py
@@ -28,6 +28,7 @@ import atr.tasks.checks.rat as rat
import atr.tasks.checks.signature as signature
import atr.tasks.checks.targz as targz
import atr.tasks.checks.zipformat as zipformat
+import atr.tasks.gha as gha
import atr.tasks.keys as keys
import atr.tasks.message as message
import atr.tasks.metadata as metadata
@@ -188,6 +189,8 @@ def queued(
def resolve(task_type: sql.TaskType) -> Callable[...,
Awaitable[results.Results | None]]: # noqa: C901
match task_type:
+ case sql.TaskType.GITHUB_ACTION_WORKFLOW:
+ return gha.trigger_workflow
case sql.TaskType.HASHING_CHECK:
return hashing.check
case sql.TaskType.KEYS_IMPORT_FILE:
diff --git a/atr/tasks/gha.py b/atr/tasks/gha.py
new file mode 100644
index 0000000..c0f6eb6
--- /dev/null
+++ b/atr/tasks/gha.py
@@ -0,0 +1,160 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import asyncio
+import json
+import uuid
+from collections.abc import Callable
+from typing import Any, Final, NoReturn
+
+import aiohttp
+
+import atr.config as config
+import atr.log as log
+import atr.models.results as results
+import atr.models.schema as schema
+import atr.tasks.checks as checks
+
+_BASE_URL: Final[str] = "https://api.github.com/repos"
+_IN_PROGRESS_STATUSES: Final[list[str]] = ["in_progress", "queued",
"requested", "waiting", "pending"]
+_COMPLETED_STATUSES: Final[list[str]] = ["completed", "success"]
+_FAILED_STATUSES: Final[list[str]] = ["failure", "timed_out"]
+_TIMEOUT_S = 5
+
+
+class GithubActionsWorkflow(schema.Strict):
+ """Arguments for the task to generate a CycloneDX SBOM."""
+
+ owner: str = schema.description("Github owner of the repository")
+ repo: str = schema.description("Repository in which to start the workflow")
+ workflow_id: str = schema.description("Workflow ID")
+ ref: str = schema.description("Git ref to trigger the workflow")
+ arguments: dict[str, str] = schema.description("Workflow arguments")
+ name: str = schema.description("Name of the run")
+
+
[email protected]_model(GithubActionsWorkflow)
+async def trigger_workflow(args: GithubActionsWorkflow) -> results.Results |
None:
+ unique_id = f"{args.name}-{uuid.uuid4()}"
+ payload = {"ref": args.ref, "inputs": {"id": unique_id, **args.arguments}}
+ headers = {"Accept": "application/vnd.github+json", "Authorization":
f"Bearer {config.get().GITHUB_TOKEN}"}
+ log.info(
+ f"Triggering Github workflow
{args.owner}/{args.repo}/{args.workflow_id} with args: {
+ json.dumps(args.arguments, indent=2)
+ }"
+ )
+ async with aiohttp.ClientSession() as session:
+ try:
+ response = await session.post(
+
f"{_BASE_URL}/{args.owner}/{args.repo}/actions/workflows/{args.workflow_id}/dispatches",
+ headers=headers,
+ json=payload,
+ )
+ response.raise_for_status()
+ except aiohttp.ClientResponseError as e:
+ _fail(f"Failed to trigger workflow run: {e.message} ({e.status})")
+
+ run, run_id = await _find_triggered_run(session, args, headers,
unique_id)
+
+ if run.get("status") in _IN_PROGRESS_STATUSES:
+ run = await _wait_for_completion(session, args, headers, run_id,
unique_id)
+
+ if run.get("status") in _FAILED_STATUSES:
+ _fail(f"Github workflow
{args.owner}/{args.repo}/{args.workflow_id} run {run_id} failed with error")
+ if run.get("status") in _COMPLETED_STATUSES:
+ log.info(f"Workflow {args.owner}/{args.repo}/{args.workflow_id}
run {run_id} completed successfully")
+ return results.GithubActionsWorkflow(
+ kind="github_actions_workflow", name=args.name, run_id=run_id,
url=run.get("html_url", "")
+ )
+ _fail(f"Timed out waiting for workflow
{args.owner}/{args.repo}/{args.workflow_id}")
+
+
+def _fail(message: str) -> NoReturn:
+ log.error(message)
+ raise RuntimeError(message)
+
+
+async def _find_triggered_run(
+ session: aiohttp.ClientSession,
+ args: GithubActionsWorkflow,
+ headers: dict[str, str],
+ unique_id: str,
+) -> tuple[dict[str, Any], int]:
+ """Find the workflow run that was just triggered."""
+
+ def get_run(resp: dict[str, Any]) -> dict[str, Any] | None:
+ return next(
+ (r for r in resp["workflow_runs"] if r["head_branch"] == args.ref
and r["name"] == unique_id),
+ None,
+ )
+
+ run = await _request_and_retry(
+ session,
f"{_BASE_URL}/{args.owner}/{args.repo}/actions/runs?event=workflow_dispatch",
headers, get_run
+ )
+ if run is None:
+ _fail(f"Failed to find triggered workflow run for {unique_id}")
+ run_id: int | None = run.get("id")
+ if run_id is None:
+ _fail(f"Found run for {unique_id} but run ID is missing")
+ return run, run_id
+
+
+async def _request_and_retry(
+ session: aiohttp.client.ClientSession,
+ url: str,
+ headers: dict[str, str],
+ response_func: Callable[[Any], dict[str, Any] | None],
+) -> dict[str, Any] | None:
+ for _attempt in range(_TIMEOUT_S * 10): # timeout_s * 10):
+ async with session.get(
+ url,
+ headers=headers,
+ ) as response:
+ try:
+ response.raise_for_status()
+ runs = await response.json()
+ data = response_func(runs)
+ if not data:
+ await asyncio.sleep(0.1)
+ else:
+ return data
+ except aiohttp.ClientResponseError as e:
+ # We don't raise here as it could be an emphemeral error - if
it continues it will return None
+ log.error(f"Failure calling Github: {e.message} ({e.status},
attempt {_attempt + 1})")
+ await asyncio.sleep(0.1)
+ return None
+
+
+async def _wait_for_completion(
+ session: aiohttp.ClientSession,
+ args: GithubActionsWorkflow,
+ headers: dict[str, str],
+ run_id: int,
+ unique_id: str,
+) -> dict[str, Any]:
+ """Wait for a workflow run to complete."""
+
+ def filter_run(resp: dict[str, Any]) -> dict[str, Any] | None:
+ if resp.get("status") not in _IN_PROGRESS_STATUSES:
+ return resp
+ return None
+
+ run = await _request_and_retry(
+ session,
f"{_BASE_URL}/{args.owner}/{args.repo}/actions/runs/{run_id}", headers,
filter_run
+ )
+ if run is None:
+ _fail(f"Failed to find triggered workflow run for {unique_id}")
+ return run
diff --git a/migrations/versions/0035_2026.01.07_7a420374.py
b/migrations/versions/0035_2026.01.07_7a420374.py
new file mode 100644
index 0000000..bd3fb53
--- /dev/null
+++ b/migrations/versions/0035_2026.01.07_7a420374.py
@@ -0,0 +1,72 @@
+"""Add github workflow task type
+
+Revision ID: 0035_2026.01.07_7a420374
+Revises: 0034_2025.12.31_ac4dcf44
+Create Date: 2026-01-07 10:26:21.975721+00:00
+"""
+
+from collections.abc import Sequence
+
+import sqlalchemy as sa
+from alembic import op
+
+# Revision identifiers, used by Alembic
+revision: str = "0035_2026.01.07_7a420374"
+down_revision: str | None = "0034_2025.12.31_ac4dcf44"
+branch_labels: str | Sequence[str] | None = None
+depends_on: str | Sequence[str] | None = None
+
+
+def upgrade() -> None:
+ with op.batch_alter_table("task", schema=None) as batch_op:
+ batch_op.alter_column(
+ "task_type",
+ existing_type=sa.VARCHAR(length=23),
+ type_=sa.Enum(
+ "GITHUB_ACTION_WORKFLOW",
+ "HASHING_CHECK",
+ "KEYS_IMPORT_FILE",
+ "LICENSE_FILES",
+ "LICENSE_HEADERS",
+ "MESSAGE_SEND",
+ "PATHS_CHECK",
+ "RAT_CHECK",
+ "SBOM_GENERATE_CYCLONEDX",
+ "SIGNATURE_CHECK",
+ "SVN_IMPORT_FILES",
+ "TARGZ_INTEGRITY",
+ "TARGZ_STRUCTURE",
+ "VOTE_INITIATE",
+ "ZIPFORMAT_INTEGRITY",
+ "ZIPFORMAT_STRUCTURE",
+ name="tasktype",
+ ),
+ existing_nullable=False,
+ )
+
+
+def downgrade() -> None:
+ with op.batch_alter_table("task", schema=None) as batch_op:
+ batch_op.alter_column(
+ "task_type",
+ existing_type=sa.VARCHAR(length=23),
+ type_=sa.Enum(
+ "HASHING_CHECK",
+ "KEYS_IMPORT_FILE",
+ "LICENSE_FILES",
+ "LICENSE_HEADERS",
+ "MESSAGE_SEND",
+ "PATHS_CHECK",
+ "RAT_CHECK",
+ "SBOM_GENERATE_CYCLONEDX",
+ "SIGNATURE_CHECK",
+ "SVN_IMPORT_FILES",
+ "TARGZ_INTEGRITY",
+ "TARGZ_STRUCTURE",
+ "VOTE_INITIATE",
+ "ZIPFORMAT_INTEGRITY",
+ "ZIPFORMAT_STRUCTURE",
+ name="tasktype",
+ ),
+ existing_nullable=False,
+ )
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]