This is an automated email from the ASF dual-hosted git repository.
akm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/tooling-agents.git
The following commit(s) were added to refs/heads/main by this push:
new e0b02fb Bucketing categories
e0b02fb is described below
commit e0b02fb65ecac3d0269c6b0822e7e644e9fda85f
Author: Andrew Musselman <[email protected]>
AuthorDate: Thu Apr 2 11:57:42 2026 -0700
Bucketing categories
---
repos/apache/github-workflows/code.py | 537 ++++++++++++++++++----------------
1 file changed, 292 insertions(+), 245 deletions(-)
diff --git a/repos/apache/github-workflows/code.py
b/repos/apache/github-workflows/code.py
index 9841ee0..8d8ff40 100644
--- a/repos/apache/github-workflows/code.py
+++ b/repos/apache/github-workflows/code.py
@@ -12,11 +12,9 @@ async def run(input_dict, tools):
github_pat = input_dict.get("github_pat", "").strip()
clear_cache_raw = input_dict.get("clear_cache", "false")
- # Parse string flags
all_repos = str(all_repos_raw).lower().strip() in ("true", "1", "yes")
clear_cache = str(clear_cache_raw).lower().strip() in ("true", "1",
"yes")
- # --- Validation ---
if not github_pat and (all_repos or not repos_str):
return {"outputText": "Error: `github_pat` is required for
org-wide scanning. "
"Unauthenticated GitHub API limit is 60 req/hr — too low
for this agent.\n\n"
@@ -26,23 +24,19 @@ async def run(input_dict, tools):
return {"outputText": "Error: provide a comma-separated repo list
in `repos`, "
"or set `all_repos` to `true` to scan the entire org."}
- # --- LLM Config ---
provider = "bedrock"
model = "us.anthropic.claude-sonnet-4-5-20250929-v1:0"
- configured_params = {"temperature": 0, "reasoning_effort": "disable",
"max_tokens": 1024}
+ configured_params = {"temperature": 0, "reasoning_effort": "disable",
"max_tokens": 2048}
- # --- GitHub API Config ---
GITHUB_API = "https://api.github.com"
gh_headers = {"Accept": "application/vnd.github.v3+json"}
if github_pat:
gh_headers["Authorization"] = f"token {github_pat}"
- # --- Data store caching ---
classification_cache =
data_store.use_namespace(f"ci-classification:{owner}")
workflow_content_cache =
data_store.use_namespace(f"ci-workflows:{owner}")
report_ns = data_store.use_namespace(f"ci-report:{owner}")
- # --- Clear cache if requested ---
if clear_cache:
print("Clearing cached data...", flush=True)
for ns in [classification_cache, workflow_content_cache,
report_ns]:
@@ -50,17 +44,12 @@ async def run(input_dict, tools):
ns.delete(key)
print("Cache cleared.", flush=True)
- # --- Preflight: verify PAT and API access ---
+ # --- Preflight ---
print("Running preflight checks...", flush=True)
-
- preflight_resp = await http_client.get(
- f"{GITHUB_API}/rate_limit",
- headers=gh_headers,
- timeout=15.0
- )
+ preflight_resp = await http_client.get(f"{GITHUB_API}/rate_limit",
headers=gh_headers, timeout=15.0)
if preflight_resp.status_code == 401:
- return {"outputText": "Error: GitHub PAT is invalid or expired.
HTTP 401 from /rate_limit.\n\n"
+ return {"outputText": "Error: GitHub PAT is invalid or expired
(HTTP 401).\n"
"Check your token at https://github.com/settings/tokens"}
if preflight_resp.status_code == 200:
@@ -71,40 +60,20 @@ async def run(input_dict, tools):
print(f" GitHub API: {remaining}/{limit} requests remaining",
flush=True)
if isinstance(remaining, int) and remaining < 50:
print(f" WARNING: Very low rate limit remaining!", flush=True)
- else:
- print(f" WARNING: /rate_limit returned HTTP
{preflight_resp.status_code}", flush=True)
if repos_str:
test_repo = repos_str.split(",")[0].strip()
- else:
- test_repo = None
-
- if test_repo:
test_url =
f"{GITHUB_API}/repos/{owner}/{test_repo}/contents/.github/workflows"
print(f" Testing access: {test_url}", flush=True)
test_resp = await http_client.get(test_url, headers=gh_headers,
timeout=15.0)
print(f" Response: HTTP {test_resp.status_code}", flush=True)
-
if test_resp.status_code == 404:
root_resp = await http_client.get(
- f"{GITHUB_API}/repos/{owner}/{test_repo}",
- headers=gh_headers, timeout=15.0
- )
+ f"{GITHUB_API}/repos/{owner}/{test_repo}",
headers=gh_headers, timeout=15.0)
if root_resp.status_code == 404:
- return {"outputText": f"Error: repo `{owner}/{test_repo}`
not found (HTTP 404). "
- "Check the repo name and PAT permissions."}
- else:
- print(f" Repo exists but has no .github/workflows
directory.", flush=True)
- elif test_resp.status_code == 200:
- test_files = test_resp.json()
- yaml_count = len([f for f in test_files if isinstance(f, dict)
and f.get("name", "").endswith((".yml", ".yaml"))])
- print(f" Found {yaml_count} YAML files in
{test_repo}/.github/workflows/", flush=True)
+ return {"outputText": f"Error: repo `{owner}/{test_repo}`
not found (HTTP 404)."}
elif test_resp.status_code == 403:
- return {"outputText": f"Error: HTTP 403 accessing
`{owner}/{test_repo}`. "
- "Your PAT may lack the `repo` or `contents:read`
scope.\n\n"
- f"Response: {test_resp.text[:300]}"}
- else:
- print(f" Unexpected: HTTP {test_resp.status_code} —
{test_resp.text[:200]}", flush=True)
+ return {"outputText": f"Error: HTTP 403 accessing
`{owner}/{test_repo}`.\n{test_resp.text[:300]}"}
print("Preflight complete.\n", flush=True)
@@ -139,44 +108,58 @@ async def run(input_dict, tools):
"Key signal: images pushed to ghcr.io/{org}/{repo} with cache
tags, or test images to gcr.io/{org}-testing/*.\n"
"- documentation: Publishing docs, websites, metrics dashboards,
coverage reports. "
"Examples: S3 sync of docs, GitHub Pages deploy, GCS website
upload, Codecov upload.\n"
- "- none: Workflow does not publish anything to any registry or
external location.\n\n"
+ "- none: Workflow does not publish anything to any registry or
external location. "
+ "This includes: uploading to GitHub Actions artifact storage
(actions/upload-artifact) which is ephemeral, "
+ "git push to branches within the same repo, creating GitHub
Release notes without downloadable binaries.\n\n"
"IMPORTANT DISTINCTIONS:\n"
"- A workflow that pushes CI cache images to GHCR is
ci_infrastructure, NOT release_artifact.\n"
"- A workflow that uploads docs to S3 is documentation, NOT
release_artifact.\n"
"- A workflow that pushes test catalog data to a git branch is
none (it's just a git commit).\n"
"- A workflow that uploads wheels to GCS for staging (not PyPI) is
snapshot_artifact.\n"
"- A workflow that creates a GitHub Release with notes is none
unless it also attaches downloadable artifacts.\n"
- "- Coverage uploads (Codecov) are documentation, not
publishing.\n\n"
- "ECOSYSTEM VALUES — use these exact strings when applicable:\n"
- "maven_central, pypi, docker_hub, npm, crates_io, nuget, rubygems,
apache_dist, helm, "
- "ghcr (GitHub Container Registry), gcr (Google Container
Registry), "
- "gcs (Google Cloud Storage), s3 (AWS S3), github_pages,
github_packages\n\n"
- "SECURITY ANALYSIS — be precise about injection risk levels:\n"
- "- CRITICAL: Direct ${{ }} interpolation inside a `run:` block. "
- "This is a real script injection vector because the expression is
expanded BEFORE the shell script is created. "
- "Example: `run: echo ${{ inputs.foo }}`\n"
- "- SAFE (do NOT flag): Values passed through `env:` blocks then
referenced as shell variables. "
- "Example: `env: FOO: ${{ inputs.foo }}` with `run: echo
\"${FOO}\"`. "
- "This is the recommended secure pattern.\n"
- "- SAFE (do NOT flag): ${{ }} used only in `with:` blocks passed
to actions. "
- "Actions receive these as input parameters, not shell-interpolated
strings.\n"
- "- SAFE (do NOT flag): ${{ }} used in `concurrency.group`. "
- "Concurrency groups are GitHub Actions configuration, NOT shell
execution contexts. Never flag these as CRITICAL.\n"
- "- LOW: GitHub-controlled values (github.actor, github.sha,
github.repository) directly in `run:` blocks. "
- "Not user-injectable but poor practice.\n"
- "- For each note, return a STRING (not an object) prefixed with
the risk level in brackets. "
- "Example: \"[CRITICAL] Direct interpolation of inputs.version in
run block at step 'Deploy'\"\n\n"
+ "- Coverage uploads (Codecov) are documentation, not publishing.\n"
+ "- Upload to GitHub Actions artifacts (actions/upload-artifact) is
none — that's ephemeral CI storage, not a registry.\n\n"
+ "ECOSYSTEM VALUES — use ONLY these exact strings:\n"
+ "maven_central (for ALL Maven repos including Apache Snapshots,
Nexus staging, etc.), "
+ "pypi, docker_hub, npm, crates_io, nuget, rubygems, apache_dist,
helm, "
+ "ghcr, gcr, gcs, s3, github_pages, github_packages,
github_releases\n"
+ "DO NOT invent new ecosystem names like 'maven_snapshots' or
'apache_snapshots' — "
+ "use maven_central for all Maven/Gradle/Nexus publishing.\n\n"
+ "SECURITY ANALYSIS — be precise about the TRUST LEVEL of
interpolated values:\n\n"
+ "CRITICAL (real injection risk from untrusted external input):\n"
+ " Direct ${{ }} interpolation of UNTRUSTED user input in a `run:`
block. "
+ "Untrusted input includes: github.event.pull_request.title,
github.event.issue.body, "
+ "github.event.comment.body, github.event.pull_request.head.ref,
github.event.review.body, "
+ "or any value controllable by external contributors who are not
committers.\n\n"
+ "LOW (secret/credential leakage risk — values are trusted but
should use env: pattern):\n"
+ " Direct ${{ secrets.* }} interpolation in a `run:` block. The
secret value is trusted "
+ "(not attacker-controlled), but direct interpolation risks leaking
it in logs if GitHub's "
+ "automatic masking is bypassed. Best practice: pass through env:
block.\n\n"
+ "LOW (trusted committer input — not externally exploitable):\n"
+ " Direct ${{ github.event.inputs.* }} interpolation in `run:`
blocks for workflow_dispatch workflows "
+ "that are only triggerable by repository committers/maintainers.
The inputs are free-form text "
+ "but from trusted sources. Main risk is accidental command
injection from malformed version strings, "
+ "not malicious exploitation. Note: if the workflow is triggerable
by external contributors, "
+ "this becomes CRITICAL.\n\n"
+ "LOW (GitHub-controlled values):\n"
+ " github.actor, github.sha, github.repository, github.ref
directly in `run:` blocks. "
+ "Not user-injectable.\n\n"
+ "SAFE (do NOT flag):\n"
+ " - Values passed through `env:` blocks then referenced as shell
variables.\n"
+ " - ${{ }} used only in `with:` blocks passed to actions.\n"
+ " - ${{ }} used in `concurrency.group` (not a shell execution
context).\n\n"
+ "For each note, return a STRING prefixed with [CRITICAL], [LOW],
or [INFO].\n"
+ "Example: \"[LOW] secrets.NEXUS_PW directly interpolated in run
block at step 'Auth'. "
+ "Trusted value but risks log leakage. Use env: block
instead.\"\n\n"
"If no publishing detected, set publishes_to_registry to false,
category to \"none\", and ecosystems to []."
)
- # Pre-compute prompt overhead for token-aware truncation
prompt_tokens = count_tokens(CLASSIFICATION_PROMPT, provider, model)
ctx_window = get_context_window(provider, model)
max_yaml_tokens = int(ctx_window * 0.75) - prompt_tokens - 300
async def github_get(url, params=None, max_retries=5):
- """GitHub API request with rate limit handling and retries."""
last_resp = None
for attempt in range(max_retries):
try:
@@ -206,33 +189,26 @@ async def run(input_dict, tools):
if remaining:
try:
rem_int = int(remaining)
- if rem_int < 20:
- print(f" WARNING: Only {rem_int} API requests
remaining!", flush=True)
- await asyncio.sleep(10)
+ if rem_int < 50:
+ print(f" WARNING: {rem_int} API requests
remaining", flush=True)
+ await asyncio.sleep(5)
elif rem_int < 100:
await asyncio.sleep(2)
except ValueError:
pass
return resp
-
return last_resp
def parse_classification(raw_text):
- """Parse LLM JSON response, handling common formatting issues."""
cleaned = raw_text.strip()
-
if cleaned.startswith("```"):
first_nl = cleaned.find("\n")
- if first_nl != -1:
- cleaned = cleaned[first_nl + 1:]
- else:
- cleaned = cleaned[3:]
+ cleaned = cleaned[first_nl + 1:] if first_nl != -1 else
cleaned[3:]
if cleaned.endswith("```"):
cleaned = cleaned[:-3]
cleaned = cleaned.strip()
-
if not cleaned.startswith("{"):
start = cleaned.find("{")
if start != -1:
@@ -241,12 +217,10 @@ async def run(input_dict, tools):
end = cleaned.rfind("}")
if end != -1:
cleaned = cleaned[:end + 1]
-
return json.loads(cleaned)
def safe_str(val):
- """Coerce any value to a stripped string."""
if val is None:
return ""
if isinstance(val, dict):
@@ -257,7 +231,6 @@ async def run(input_dict, tools):
def normalize_note(note):
- """Coerce a security note to a formatted string, handling both str
and dict."""
if isinstance(note, str):
return note.strip()
if isinstance(note, dict):
@@ -268,38 +241,122 @@ async def run(input_dict, tools):
def downgrade_contradictions(text):
- """If a CRITICAL note also says env-mediated/safe pattern,
downgrade to INFO."""
+ """Refine severity based on what the note actually describes."""
if "[CRITICAL]" not in text:
return text
- safe_phrases = ["env-mediated", "safe pattern", "passed through
env",
- "through env: block", "through env block", "env:
block first",
- "passed through env:", "env vars (which is safer)"]
- for phrase in safe_phrases:
- if phrase.lower() in text.lower():
+ text_lower = text.lower()
+
+ # Env-mediated patterns are safe, not even LOW
+ env_phrases = [
+ "env-mediated", "safe pattern", "passed through env",
+ "through env: block", "through env block", "env: block first",
+ "via env", "in env block", "environment variable usage",
+ "env block passed to", "via environment variable",
+ "environment variable",
+ ]
+ for phrase in env_phrases:
+ if phrase.lower() in text_lower:
return text.replace("[CRITICAL]", "[INFO-DOWNGRADED]")
+
+ # Secrets interpolation — trusted values, leakage risk only
+ secret_phrases = [
+ "secrets.", "secret.", "nexus_user", "nexus_pw",
"nexus_password",
+ "dockerhub_token", "dockerhub_user", "pypi_api_token",
+ "gpg_private_key", "gpg_passphrase", "gpg_secret_key",
+ "svn_username", "svn_password",
+ ]
+ for phrase in secret_phrases:
+ if phrase.lower() in text_lower:
+ return text.replace("[CRITICAL]", "[LOW-LEAKAGE]")
+
+ # workflow_dispatch inputs from committers — trusted but sloppy
+ dispatch_phrases = [
+ "workflow_dispatch input", "inputs.release", "inputs.rc",
+ "inputs.apache_id", "inputs.apache_password",
"inputs.repo_token",
+ "inputs.version", "inputs.branch", "github.event.inputs.",
+ ]
+ for phrase in dispatch_phrases:
+ if phrase.lower() in text_lower:
+ return text.replace("[CRITICAL]", "[LOW-TRUSTED-INPUT]")
+
return text
+ ECOSYSTEM_ALIASES = {
+ "maven_snapshots": "maven_central",
+ "apache_snapshots": "maven_central",
+ "maven": "maven_central",
+ "nexus": "maven_central",
+ "docker": "docker_hub",
+ "dockerhub": "docker_hub",
+ "github_artifacts": "github_actions_artifacts",
+ }
+
+ def normalize_ecosystem(eco):
+ eco = safe_str(eco).lower().strip().replace(" ", "_")
+ return ECOSYSTEM_ALIASES.get(eco, eco)
+
+
+ TRUSTED_PUBLISHING_ECOSYSTEMS = {
+ "pypi": {
+ "label": "PyPI",
+ "mechanism": "OIDC Trusted Publisher via
pypa/gh-action-pypi-publish",
+ "docs": "https://docs.pypi.org/trusted-publishers/",
+ },
+ "npm": {
+ "label": "npm",
+ "mechanism": "npm provenance with OIDC",
+ "docs":
"https://docs.npmjs.com/generating-provenance-statements",
+ },
+ "nuget": {
+ "label": "NuGet",
+ "mechanism": "Sigstore-based Trusted Publishing",
+ "docs":
"https://devblogs.microsoft.com/nuget/introducing-trusted-publishers/",
+ },
+ "rubygems": {
+ "label": "RubyGems",
+ "mechanism": "OIDC Trusted Publisher",
+ "docs": "https://guides.rubygems.org/trusted-publishing/",
+ },
+ "crates_io": {
+ "label": "crates.io",
+ "mechanism": "OIDC Trusted Publishing",
+ "docs":
"https://doc.rust-lang.org/cargo/reference/registry-authentication.html",
+ },
+ }
+
+ TOKEN_PATTERNS = [
+ "token", "password", "secret", "api_key", "apikey",
+ "nexus_user", "nexus_pw", "pypi_api_token",
+ ]
+
+ def uses_long_lived_token(auth_str):
+ auth_lower = auth_str.lower()
+ if "oidc" in auth_lower or "trusted publisher" in auth_lower or
"id-token" in auth_lower:
+ return False
+ for pat in TOKEN_PATTERNS:
+ if pat in auth_lower:
+ return True
+ return False
+
+
def sanitize_md(value):
- """Sanitize text for safe inclusion in Markdown tables."""
if not value:
return "N/A"
return str(value).replace("|", "∣").replace("\n", " ").strip()
def truncate_yaml(content):
- """Token-aware truncation of workflow YAML to fit context
window."""
yaml_tokens = count_tokens(content, provider, model)
if yaml_tokens <= max_yaml_tokens:
return content
-
lines_list = content.split("\n")
truncated = []
running = 0
for line in lines_list:
lt = count_tokens(line, provider, model)
if running + lt > max_yaml_tokens:
- truncated.append("# ... [TRUNCATED — file too large for
single classification] ...")
+ truncated.append("# ... [TRUNCATED] ...")
break
truncated.append(line)
running += lt
@@ -319,51 +376,41 @@ async def run(input_dict, tools):
if all_repos:
print(f"Fetching all repos for {owner}...", flush=True)
repo_names = []
+ skipped_archived = 0
page = 1
while True:
resp = await github_get(
f"{GITHUB_API}/orgs/{owner}/repos",
- params={"per_page": 100, "page": page, "sort": "pushed",
"type": "public"}
- )
-
+ params={"per_page": 100, "page": page, "sort": "pushed",
"type": "public"})
if resp is None or resp.status_code != 200:
- if resp:
- print(f" Failed to fetch page {page}: HTTP
{resp.status_code}", flush=True)
break
-
page_data = resp.json()
if not page_data or not isinstance(page_data, list):
break
-
- repo_names.extend([r["name"] for r in page_data if
isinstance(r, dict) and "name" in r])
-
+ for r in page_data:
+ if isinstance(r, dict) and "name" in r:
+ if r.get("archived"):
+ skipped_archived += 1
+ else:
+ repo_names.append(r["name"])
if 'rel="next"' not in resp.headers.get("Link", ""):
break
-
page += 1
await asyncio.sleep(0.3)
-
- print(f"Found {len(repo_names)} repos in {owner}", flush=True)
+ print(f"Found {len(repo_names)} active repos ({skipped_archived}
archived skipped)", flush=True)
else:
repo_names = [r.strip() for r in repos_str.split(",") if r.strip()]
print(f"Using provided list of {len(repo_names)} repos",
flush=True)
if not repo_names:
- return {"outputText": f"# CI Registry Publishing Analysis:
{owner}\n\n"
- "No repositories found. Check the owner name and GitHub
PAT permissions."}
+ return {"outputText": f"# CI Registry Publishing Analysis:
{owner}\n\nNo repositories found."}
print(f"\nStarting workflow scan of {len(repo_names)} repos...\n",
flush=True)
# ===== STEP 2: Fetch workflows and classify =====
all_results = {}
- stats = {
- "repos_scanned": 0,
- "repos_with_workflows": 0,
- "total_workflows": 0,
- "total_classified": 0,
- "cache_hits": 0,
- "errors": [],
- }
+ stats = {"repos_scanned": 0, "repos_with_workflows": 0,
"total_workflows": 0,
+ "total_classified": 0, "cache_hits": 0, "errors": []}
for repo_idx, repo_name in enumerate(repo_names):
stats["repos_scanned"] += 1
@@ -373,7 +420,6 @@ async def run(input_dict, tools):
f"({stats['total_workflows']} wfs,
{stats['total_classified']} classified, "
f"{stats['cache_hits']} cached)", flush=True)
- # Check repo-level cache
meta_key = f"__meta__:{repo_name}"
cached_meta = classification_cache.get(meta_key)
@@ -393,21 +439,16 @@ async def run(input_dict, tools):
stats["total_classified"] += len(repo_results)
continue
- # Fetch workflow directory listing
resp = await
github_get(f"{GITHUB_API}/repos/{owner}/{repo_name}/contents/.github/workflows")
if resp is None:
- print(f" {repo_name}: no response (network error), skipping",
flush=True)
- stats["errors"].append(f"{owner}/{repo_name}: network error
fetching workflow list")
+ stats["errors"].append(f"{owner}/{repo_name}: network error")
continue
-
if resp.status_code == 404:
classification_cache.set(meta_key, {"complete": True,
"workflows": []})
continue
-
if resp.status_code != 200:
- print(f" {repo_name}: HTTP {resp.status_code} fetching
workflows, skipping", flush=True)
- stats["errors"].append(f"{owner}/{repo_name}: HTTP
{resp.status_code} fetching workflow list")
+ stats["errors"].append(f"{owner}/{repo_name}: HTTP
{resp.status_code}")
continue
try:
@@ -420,10 +461,8 @@ async def run(input_dict, tools):
classification_cache.set(meta_key, {"complete": True,
"workflows": []})
continue
- yaml_files = [
- f for f in dir_listing
- if isinstance(f, dict) and f.get("name", "").endswith((".yml",
".yaml"))
- ]
+ yaml_files = [f for f in dir_listing
+ if isinstance(f, dict) and f.get("name",
"").endswith((".yml", ".yaml"))]
if not yaml_files:
classification_cache.set(meta_key, {"complete": True,
"workflows": []})
@@ -438,7 +477,6 @@ async def run(input_dict, tools):
workflow_names.append(wf_name)
stats["total_workflows"] += 1
- # Check per-workflow cache
wf_cache_key = f"{repo_name}:{wf_name}"
cached_cls = classification_cache.get(wf_cache_key)
if cached_cls:
@@ -447,10 +485,8 @@ async def run(input_dict, tools):
stats["cache_hits"] += 1
continue
- # Fetch raw content (download_url bypasses API rate limit)
raw_url = wf_file.get("download_url")
yaml_content = None
-
if raw_url:
try:
content_resp = await http_client.get(raw_url,
follow_redirects=True, timeout=30.0)
@@ -460,36 +496,22 @@ async def run(input_dict, tools):
pass
if yaml_content is None:
- error_result = {"file": wf_name, "error": "Could not fetch
content", "publishes_to_registry": None}
- repo_results.append(error_result)
+ repo_results.append({"file": wf_name, "error": "Could not
fetch", "publishes_to_registry": None})
continue
- # Store raw workflow content for other agents
workflow_content_cache.set(f"{repo_name}/{wf_name}",
yaml_content)
-
- # Token-aware truncation
yaml_content = truncate_yaml(yaml_content)
- # Classify with LLM
llm_response = None
try:
- messages = [
- {"role": "user",
- "content": (
- f"{CLASSIFICATION_PROMPT}\n\n---\n"
- f"File:
{owner}/{repo_name}/.github/workflows/{wf_name}\n"
- f"---\n\n{yaml_content}"
- )}
- ]
+ messages = [{"role": "user", "content": (
+ f"{CLASSIFICATION_PROMPT}\n\n---\n"
+ f"File:
{owner}/{repo_name}/.github/workflows/{wf_name}\n---\n\n{yaml_content}"
+ )}]
llm_response, _ = await call_llm(
- provider=provider,
- model=model,
- messages=messages,
- parameters=configured_params,
- user_service=None,
- user_id=None,
- )
+ provider=provider, model=model, messages=messages,
+ parameters=configured_params, user_service=None,
user_id=None)
classification = parse_classification(llm_response)
classification["file"] = wf_name
@@ -498,63 +520,38 @@ async def run(input_dict, tools):
stats["total_classified"] += 1
except json.JSONDecodeError:
- error_result = {
- "file": wf_name,
- "error": "JSON parse error",
- "raw_response": (llm_response or "")[:300],
- "publishes_to_registry": None,
- }
- repo_results.append(error_result)
+ repo_results.append({"file": wf_name, "error": "JSON parse
error",
+ "raw_response": (llm_response or
"")[:300], "publishes_to_registry": None})
stats["errors"].append(f"{owner}/{repo_name}/.github/workflows/{wf_name}: JSON
parse error")
-
except Exception as e:
- error_result = {
- "file": wf_name,
- "error": str(e)[:200],
- "publishes_to_registry": None,
- }
- repo_results.append(error_result)
+ repo_results.append({"file": wf_name, "error":
str(e)[:200], "publishes_to_registry": None})
stats["errors"].append(f"{owner}/{repo_name}/.github/workflows/{wf_name}:
{str(e)[:80]}")
await asyncio.sleep(0.3)
if repo_results:
all_results[repo_name] = repo_results
-
classification_cache.set(meta_key, {"complete": True, "workflows":
workflow_names})
print(f"\n{'=' * 60}", flush=True)
- print(f"Scan complete!", flush=True)
- print(f" Repos scanned: {stats['repos_scanned']}", flush=True)
- print(f" Repos with workflows: {stats['repos_with_workflows']}",
flush=True)
- print(f" Total workflows: {stats['total_workflows']}", flush=True)
- print(f" Classified: {stats['total_classified']}
({stats['cache_hits']} from cache)", flush=True)
- if stats["errors"]:
- print(f" Errors: {len(stats['errors'])}", flush=True)
+ print(f"Scan complete! {stats['repos_scanned']} repos,
{stats['total_classified']} classified "
+ f"({stats['cache_hits']} cached), {len(stats['errors'])}
errors", flush=True)
print(f"{'=' * 60}\n", flush=True)
- # ===== STEP 3: Build Markdown report =====
+ # ===== STEP 3: Build report =====
- if len(repo_names) == 1:
- report_title = f"CI Registry Publishing Analysis:
{owner}/{repo_names[0]}"
- else:
- report_title = f"CI Registry Publishing Analysis: {owner}"
+ report_title = (f"CI Registry Publishing Analysis:
{owner}/{repo_names[0]}"
+ if len(repo_names) == 1
+ else f"CI Registry Publishing Analysis: {owner}")
lines = []
lines.append(f"Scanned **{stats['repos_scanned']}** repositories, "
f"**{stats['repos_with_workflows']}** had GitHub Actions
workflow files, "
f"**{stats['total_workflows']}** total workflows
analyzed.\n")
- # --- Collect all publishing workflows by category ---
- by_category = {
- "release_artifact": [],
- "snapshot_artifact": [],
- "ci_infrastructure": [],
- "documentation": [],
- }
+ # --- Collect workflows by category ---
+ by_category = {"release_artifact": [], "snapshot_artifact": [],
"ci_infrastructure": [], "documentation": []}
ecosystem_counts = {}
- auth_methods_agg = {}
- trigger_types_agg = {}
security_notes_all = []
publishing_repos = set()
@@ -564,34 +561,27 @@ async def run(input_dict, tools):
continue
cat = safe_str(w.get("category")).lower().strip()
+ ecosystems_raw = [normalize_ecosystem(e) for e in
(w.get("ecosystems") or [])]
+
+ if cat == "snapshot_artifact" and all(e ==
"github_actions_artifacts" for e in ecosystems_raw):
+ continue
+
if cat not in by_category:
cat = "release_artifact"
- by_category[cat].append({"repo": repo, **w})
+ entry = {"repo": repo, **w}
+ entry["ecosystems"] = ecosystems_raw
+ by_category[cat].append(entry)
publishing_repos.add(repo)
- for eco in (w.get("ecosystems") or []):
- eco_key = safe_str(eco).lower().replace(" ", "_")
- if eco_key:
- ecosystem_counts[eco_key] =
ecosystem_counts.get(eco_key, 0) + 1
-
- auth = safe_str(w.get("auth_method"))
- if auth:
- auth_methods_agg[auth] = auth_methods_agg.get(auth, 0) + 1
-
- trigger = safe_str(w.get("trigger"))
- if trigger:
- trigger_types_agg[trigger] =
trigger_types_agg.get(trigger, 0) + 1
+ for eco in ecosystems_raw:
+ if eco and eco != "github_actions_artifacts":
+ ecosystem_counts[eco] = ecosystem_counts.get(eco, 0) +
1
for raw_note in (w.get("security_notes") or []):
note = downgrade_contradictions(normalize_note(raw_note))
if raw_note else ""
if note:
- security_notes_all.append({
- "repo": repo,
- "file": w.get("file", "?"),
- "note": note,
- "category": cat,
- })
+ security_notes_all.append({"repo": repo, "file":
w.get("file", "?"), "note": note, "category": cat})
release_wfs = by_category["release_artifact"]
snapshot_wfs = by_category["snapshot_artifact"]
@@ -613,13 +603,12 @@ async def run(input_dict, tools):
lines.append(f"| Security notes flagged | {len(security_notes_all)} |")
lines.append("")
- # --- Ecosystem Distribution (release + snapshot only) ---
+ # --- Ecosystem Distribution ---
release_ecosystems = {}
for w in release_wfs + snapshot_wfs:
for eco in (w.get("ecosystems") or []):
- eco_key = safe_str(eco).lower().replace(" ", "_")
- if eco_key:
- release_ecosystems[eco_key] =
release_ecosystems.get(eco_key, 0) + 1
+ if eco and eco != "github_actions_artifacts":
+ release_ecosystems[eco] = release_ecosystems.get(eco, 0) +
1
if release_ecosystems:
lines.append("## Package Ecosystem Distribution (releases +
snapshots only)\n")
@@ -631,7 +620,47 @@ async def run(input_dict, tools):
lines.append(f"| {eco} | {count} | {pct:.1f}% |")
lines.append("")
- # --- Release Artifact Workflows ---
+ # --- Trusted Publishing Opportunities ---
+ tp_opportunities = []
+ for w in release_wfs + snapshot_wfs:
+ ecosystems = w.get("ecosystems") or []
+ auth = safe_str(w.get("auth_method"))
+ for eco in ecosystems:
+ if eco in TRUSTED_PUBLISHING_ECOSYSTEMS and
uses_long_lived_token(auth):
+ tp_info = TRUSTED_PUBLISHING_ECOSYSTEMS[eco]
+ tp_opportunities.append({
+ "repo": w.get("repo", "?"), "file": w.get("file", "?"),
+ "ecosystem": eco, "ecosystem_label": tp_info["label"],
+ "current_auth": auth, "mechanism":
tp_info["mechanism"],
+ "docs": tp_info["docs"], "category":
safe_str(w.get("category")),
+ })
+
+ if tp_opportunities:
+ lines.append("## Trusted Publishing Migration Opportunities\n")
+ lines.append("These workflows publish to ecosystems that support
OIDC Trusted Publishing "
+ "but currently use long-lived API tokens or
passwords. "
+ "Migrating to Trusted Publishing eliminates stored
secrets and reduces supply-chain risk.\n")
+
+ tp_by_eco = {}
+ for opp in tp_opportunities:
+ tp_by_eco.setdefault(opp["ecosystem"], []).append(opp)
+
+ for eco, opps in sorted(tp_by_eco.items()):
+ info = TRUSTED_PUBLISHING_ECOSYSTEMS[eco]
+ lines.append(f"### {info['label']}\n")
+ lines.append(f"**Available mechanism:** {info['mechanism']}")
+ lines.append(f"**Documentation:** {info['docs']}\n")
+ lines.append("| Repository | Workflow | Current Auth |
Category |")
+
lines.append("|------------|----------|-------------|----------|")
+ for opp in sorted(opps, key=lambda x: (x["repo"], x["file"])):
+ cat_label = CATEGORY_LABELS.get(opp["category"],
opp["category"])
+ lines.append(f"| {opp['repo']} | `{opp['file']}` |
{sanitize_md(opp['current_auth'])} | {cat_label} |")
+ lines.append("")
+ else:
+ lines.append("## Trusted Publishing Migration Opportunities\n")
+ lines.append("No workflows found that could migrate to Trusted
Publishing.\n")
+
+ # --- Release Workflows ---
if release_wfs:
lines.append("## Release Artifact Workflows\n")
lines.append("These workflows publish versioned packages to public
registries consumed by end users.\n")
@@ -642,11 +671,10 @@ async def run(input_dict, tools):
lines.append(
f"| {w['repo']} | `{w.get('file', '?')}` |
{sanitize_md(eco_str)} "
f"| {sanitize_md(safe_str(w.get('trigger')))} "
- f"| {sanitize_md(safe_str(w.get('auth_method')))} |"
- )
+ f"| {sanitize_md(safe_str(w.get('auth_method')))} |")
lines.append("")
- # --- Snapshot Artifact Workflows ---
+ # --- Snapshot Workflows ---
if snapshot_wfs:
lines.append("## Snapshot / Nightly Artifact Workflows\n")
lines.append("These workflows publish snapshot or nightly builds
to staging registries.\n")
@@ -657,8 +685,7 @@ async def run(input_dict, tools):
lines.append(
f"| {w['repo']} | `{w.get('file', '?')}` |
{sanitize_md(eco_str)} "
f"| {sanitize_md(safe_str(w.get('trigger')))} "
- f"| {sanitize_md(safe_str(w.get('auth_method')))} |"
- )
+ f"| {sanitize_md(safe_str(w.get('auth_method')))} |")
lines.append("")
# --- CI Infrastructure (collapsed) ---
@@ -671,8 +698,7 @@ async def run(input_dict, tools):
lines.append("|------------|----------|--------|---------|")
for w in sorted(ci_wfs, key=lambda x: (x["repo"], x.get("file",
""))):
eco_str = ", ".join(w.get("ecosystems", [])) or "—"
- summary = safe_str(w.get("summary"))
- lines.append(f"| {w['repo']} | `{w.get('file', '?')}` |
{sanitize_md(eco_str)} | {sanitize_md(summary)} |")
+ lines.append(f"| {w['repo']} | `{w.get('file', '?')}` |
{sanitize_md(eco_str)} | {sanitize_md(safe_str(w.get('summary')))} |")
lines.append(f"\n</details>\n")
# --- Documentation (collapsed) ---
@@ -683,40 +709,61 @@ async def run(input_dict, tools):
lines.append("|------------|----------|--------|---------|")
for w in sorted(doc_wfs, key=lambda x: (x["repo"], x.get("file",
""))):
eco_str = ", ".join(w.get("ecosystems", [])) or "—"
- summary = safe_str(w.get("summary"))
- lines.append(f"| {w['repo']} | `{w.get('file', '?')}` |
{sanitize_md(eco_str)} | {sanitize_md(summary)} |")
+ lines.append(f"| {w['repo']} | `{w.get('file', '?')}` |
{sanitize_md(eco_str)} | {sanitize_md(safe_str(w.get('summary')))} |")
lines.append(f"\n</details>\n")
- # --- Security Notes (split by severity) ---
+ # --- Security Notes by severity ---
critical_notes = [sn for sn in security_notes_all if "[CRITICAL]" in
sn["note"]]
- low_notes = [sn for sn in security_notes_all if "[LOW]" in sn["note"]]
+ leakage_notes = [sn for sn in security_notes_all if "[LOW-LEAKAGE]" in
sn["note"]]
+ trusted_input_notes = [sn for sn in security_notes_all if
"[LOW-TRUSTED-INPUT]" in sn["note"]]
downgraded_notes = [sn for sn in security_notes_all if
"[INFO-DOWNGRADED]" in sn["note"]]
+ low_notes = [sn for sn in security_notes_all if
sn["note"].startswith("[LOW]")]
if critical_notes:
- lines.append("## Security: Critical Findings\n")
- lines.append("Direct `${{ }}` interpolation in `run:` blocks —
real script injection vectors.\n")
+ lines.append("## Security: Critical — Untrusted Input Injection\n")
+ lines.append("Direct `${{ }}` interpolation of **untrusted
external input** in `run:` blocks. "
+ "These are real script injection vectors exploitable
by external contributors.\n")
for sn in critical_notes:
lines.append(f"- **{owner}/{sn['repo']}** (`{sn['file']}`):
{sn['note']}")
lines.append("")
+ if leakage_notes:
+ lines.append("## Security: Credential Leakage Risk\n")
+ lines.append("Direct `${{ secrets.* }}` interpolation in `run:`
blocks. The values are trusted "
+ "(not attacker-controlled) but direct interpolation
risks leaking them in logs if "
+ "GitHub's automatic masking is bypassed. Best
practice: pass through `env:` block.\n")
+ lines.append(f"<details>\n<summary>Show {len(leakage_notes)}
credential leakage findings</summary>\n")
+ for sn in leakage_notes:
+ lines.append(f"- **{owner}/{sn['repo']}** (`{sn['file']}`):
{sn['note']}")
+ lines.append(f"\n</details>\n")
+
+ if trusted_input_notes:
+ lines.append("## Security: Trusted Committer Input\n")
+ lines.append("Direct `${{ github.event.inputs.* }}` interpolation
in `run:` blocks for "
+ "`workflow_dispatch` workflows only triggerable by
committers. Not externally exploitable, "
+ "but could cause issues with malformed input
values.\n")
+ lines.append(f"<details>\n<summary>Show {len(trusted_input_notes)}
trusted input findings</summary>\n")
+ for sn in trusted_input_notes:
+ lines.append(f"- **{owner}/{sn['repo']}** (`{sn['file']}`):
{sn['note']}")
+ lines.append(f"\n</details>\n")
+
if downgraded_notes:
lines.append("## Security: Auto-Downgraded Findings\n")
- lines.append("These were initially flagged CRITICAL but the note
itself describes an env-mediated pattern, "
- "which is the safe approach. Verify manually if
concerned.\n")
+ lines.append("Initially flagged but the note describes an
env-mediated pattern (safe). Verify manually.\n")
lines.append(f"<details>\n<summary>Show {len(downgraded_notes)}
downgraded findings</summary>\n")
for sn in downgraded_notes:
lines.append(f"- **{owner}/{sn['repo']}** (`{sn['file']}`):
{sn['note']}")
lines.append(f"\n</details>\n")
if low_notes:
- lines.append("## Security: Low Risk Findings\n")
- lines.append("GitHub-controlled values used directly in `run:`
blocks. Not user-injectable but poor practice.\n")
+ lines.append("## Security: Low Risk\n")
+ lines.append("GitHub-controlled values used directly in `run:`
blocks.\n")
lines.append(f"<details>\n<summary>Show {len(low_notes)} low-risk
findings</summary>\n")
for sn in low_notes:
lines.append(f"- **{owner}/{sn['repo']}** (`{sn['file']}`):
{sn['note']}")
lines.append(f"\n</details>\n")
- # --- Detailed Per-Repo Results (release + snapshot only) ---
+ # --- Detailed Results (release + snapshot) ---
lines.append("## Detailed Results: Release & Snapshot Workflows\n")
detail_count = 0
@@ -728,7 +775,7 @@ async def run(input_dict, tools):
detail_count += 1
repo_ecosystems = set()
for w in repo_release:
- repo_ecosystems.update([safe_str(e).lower() for e in
(w.get("ecosystems") or [])])
+ repo_ecosystems.update([e for e in (w.get("ecosystems") or [])
if e])
cat_counts = {}
for w in repo_release:
@@ -755,10 +802,11 @@ async def run(input_dict, tools):
if w.get("publish_commands"):
lines.append(f"- **Commands**: {', '.join(f'`{c}`' for c
in w['publish_commands'])}")
sec_notes = w.get("security_notes") or []
- critical_for_wf = [downgrade_contradictions(normalize_note(n))
for n in sec_notes
- if n and "[CRITICAL]" in
downgrade_contradictions(normalize_note(n))]
- if critical_for_wf:
- lines.append(f"- **Security**: {';
'.join(critical_for_wf)}")
+ notable = [downgrade_contradictions(normalize_note(n)) for n
in sec_notes
+ if n and any(tag in
downgrade_contradictions(normalize_note(n))
+ for tag in ["[CRITICAL]",
"[LOW-LEAKAGE]", "[LOW-TRUSTED-INPUT]"])]
+ if notable:
+ lines.append(f"- **Security**: {'; '.join(notable)}")
lines.append("")
if detail_count == 0:
@@ -779,22 +827,20 @@ async def run(input_dict, tools):
# --- Errors ---
if stats["errors"]:
lines.append("## Errors\n")
- lines.append(f"{len(stats['errors'])} issues encountered during
scanning:\n")
+ lines.append(f"{len(stats['errors'])} issues encountered:\n")
for err in stats["errors"][:100]:
lines.append(f"- `{err}`")
if len(stats["errors"]) > 100:
lines.append(f"\n*...and {len(stats['errors']) - 100} more.*")
lines.append("")
- # --- Footer ---
lines.append("---\n")
lines.append(f"*Cached in `ci-classification:{owner}`. "
- f"Set `clear_cache` to `true` to force a fresh scan. "
- f"Raw YAML stored in `ci-workflows:{owner}`.*")
+ f"Set `clear_cache` to `true` to force a fresh scan.*")
report_body = "\n".join(lines)
- # ===== Build table of contents =====
+ # --- TOC ---
def to_anchor(text):
anchor = text.lower().strip()
anchor = re.sub(r'[^\w\s-]', '', anchor)
@@ -806,6 +852,8 @@ async def run(input_dict, tools):
toc_lines.append(f"- [Executive Summary](#{to_anchor('Executive
Summary')})")
if release_ecosystems:
toc_lines.append(f"- [Package Ecosystem
Distribution](#{to_anchor('Package Ecosystem Distribution releases snapshots
only')})")
+ if tp_opportunities:
+ toc_lines.append(f"- [Trusted Publishing
Opportunities](#{to_anchor('Trusted Publishing Migration Opportunities')})
({len(tp_opportunities)})")
if release_wfs:
toc_lines.append(f"- [Release Artifact
Workflows](#{to_anchor('Release Artifact Workflows')}) ({len(release_wfs)})")
if snapshot_wfs:
@@ -815,42 +863,41 @@ async def run(input_dict, tools):
if doc_wfs:
toc_lines.append(f"- [Documentation
Workflows](#{to_anchor('Documentation Website Workflows')}) ({len(doc_wfs)})")
if critical_notes:
- toc_lines.append(f"- [Security: Critical](#{to_anchor('Security
Critical Findings')}) ({len(critical_notes)})")
+ toc_lines.append(f"- [Security: Critical](#{to_anchor('Security
Critical Untrusted Input Injection')}) ({len(critical_notes)})")
+ if leakage_notes:
+ toc_lines.append(f"- [Security: Credential
Leakage](#{to_anchor('Security Credential Leakage Risk')})
({len(leakage_notes)})")
+ if trusted_input_notes:
+ toc_lines.append(f"- [Security: Trusted
Input](#{to_anchor('Security Trusted Committer Input')})
({len(trusted_input_notes)})")
if downgraded_notes:
toc_lines.append(f"- [Security: Downgraded](#{to_anchor('Security
Auto-Downgraded Findings')}) ({len(downgraded_notes)})")
if low_notes:
- toc_lines.append(f"- [Security: Low Risk](#{to_anchor('Security
Low Risk Findings')}) ({len(low_notes)})")
+ toc_lines.append(f"- [Security: Low Risk](#{to_anchor('Security
Low Risk')}) ({len(low_notes)})")
toc_lines.append(f"- [Detailed Results](#{to_anchor('Detailed Results
Release Snapshot Workflows')})")
for repo in sorted(all_results.keys()):
- repo_release = [w for w in (release_wfs + snapshot_wfs) if
w.get("repo") == repo]
- if repo_release:
- label = f"{owner}/{repo}"
- toc_lines.append(f" - [{label}](#{to_anchor(label)})")
+ if any(w.get("repo") == repo for w in (release_wfs +
snapshot_wfs)):
+ toc_lines.append(f" -
[{owner}/{repo}](#{to_anchor(f'{owner}/{repo}')})")
if non_publishing:
toc_lines.append(f"- [Non-publishing
Repos](#{to_anchor('Repositories with Workflows No Publishing Detected')})")
if stats["errors"]:
toc_lines.append(f"- [Errors](#{to_anchor('Errors')})")
toc = "\n".join(toc_lines)
-
- # Combine TOC + report (title is in TOC only, not in report_body)
full_report = toc + "\n\n---\n\n" + report_body
- # Store report and stats
report_ns.set("latest_report", full_report)
report_ns.set("latest_stats", {
"repos_scanned": stats["repos_scanned"],
"repos_with_workflows": stats["repos_with_workflows"],
"total_workflows": stats["total_workflows"],
- "publishing_repos_count": len(publishing_repos),
"publishing_repos": sorted(publishing_repos),
"by_category": {k: len(v) for k, v in by_category.items()},
"ecosystem_counts": ecosystem_counts,
- "auth_methods": auth_methods_agg,
- "trigger_types": trigger_types_agg,
"security_notes_count": len(security_notes_all),
- "critical_security_count": len(critical_notes),
- "downgraded_security_count": len(downgraded_notes),
+ "critical_count": len(critical_notes),
+ "leakage_count": len(leakage_notes),
+ "trusted_input_count": len(trusted_input_notes),
+ "downgraded_count": len(downgraded_notes),
+ "trusted_publishing_opportunities": len(tp_opportunities),
})
return {"outputText": full_report}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]