This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 10ccbd0295f Prefetch next page and add detailed progress for
auto-triage (#63770)
10ccbd0295f is described below
commit 10ccbd0295f48431cd40c7949ed9c3f1deb22396
Author: Jarek Potiuk <[email protected]>
AuthorDate: Tue Mar 17 08:49:01 2026 +0100
Prefetch next page and add detailed progress for auto-triage (#63770)
* Prefetch next page, add progress indicators, and Arch column for
auto-triage
- Prefetch next page of PRs in background while processing current page
- Cache CI failure logs by commit SHA to avoid redundant fetches
- Show download progress for log fetching and failed-job fetching
- Parallelize canary build failed-jobs fetching with ThreadPoolExecutor
- Add Arch column (ARM/AMD) to canary builds table derived from job names
- Invalidate cached canary builds when failed-job details are missing
* Fix mypy error in common.ai sql_validation module
Co-Authored-By: Claude Opus 4.6 (1M context) <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
---
dev/breeze/doc/images/output_pr_auto-triage.svg | 2 +-
dev/breeze/doc/images/output_pr_auto-triage.txt | 2 +-
.../src/airflow_breeze/commands/pr_commands.py | 671 +++++++++++++++------
.../providers/common/ai/utils/sql_validation.py | 2 +-
4 files changed, 504 insertions(+), 173 deletions(-)
diff --git a/dev/breeze/doc/images/output_pr_auto-triage.svg
b/dev/breeze/doc/images/output_pr_auto-triage.svg
index b94fd60b346..c3a56f415f0 100644
--- a/dev/breeze/doc/images/output_pr_auto-triage.svg
+++ b/dev/breeze/doc/images/output_pr_auto-triage.svg
@@ -426,7 +426,7 @@
</text><text class="breeze-pr-auto-triage-r5" x="0" y="1996.4"
textLength="12.2" clip-path="url(#breeze-pr-auto-triage-line-81)">│</text><text
class="breeze-pr-auto-triage-r6" x="256.2" y="1996.4" textLength="1159"
clip-path="url(#breeze-pr-auto-triage-line-81)">>claude/claude-sonnet-4-6< | claude/claude-opus-4-20250514 | claude/claude-sonnet-4-20250514 | </text><text
class="breeze-pr-auto-triage-r5" x="1451.8" y="1996.4" textLength="12.2"
clip-path="u [...]
</text><text class="breeze-pr-auto-triage-r5" x="0" y="2020.8"
textLength="12.2" clip-path="url(#breeze-pr-auto-triage-line-82)">│</text><text
class="breeze-pr-auto-triage-r6" x="256.2" y="2020.8" textLength="1110.2"
clip-path="url(#breeze-pr-auto-triage-line-82)">claude/claude-haiku-4-5-20251001 | claude/sonnet | claude/opus | claude/haiku | codex/o3 | </text><text
class="breeze-pr-auto-triage-r5" x="1451.8" y="2020.8" textLength="12.2" [...]
</text><text class="breeze-pr-auto-triage-r5" x="0" y="2045.2"
textLength="12.2" clip-path="url(#breeze-pr-auto-triage-line-83)">│</text><text
class="breeze-pr-auto-triage-r6" x="256.2" y="2045.2" textLength="366"
clip-path="url(#breeze-pr-auto-triage-line-83)">codex/o4-mini | codex/gpt-4.1)</text><text
class="breeze-pr-auto-triage-r5" x="1451.8" y="2045.2" textLength="12.2"
clip-path="url(#breeze-pr-auto-triage-line-83)">│</text><text
class="breeze-pr-auto-triage-r1" x="1464" [...]
-</text><text class="breeze-pr-auto-triage-r5" x="0" y="2069.6"
textLength="12.2" clip-path="url(#breeze-pr-auto-triage-line-84)">│</text><text
class="breeze-pr-auto-triage-r4" x="24.4" y="2069.6" textLength="207.4"
clip-path="url(#breeze-pr-auto-triage-line-84)">--llm-concurrency</text><text
class="breeze-pr-auto-triage-r1" x="256.2" y="2069.6" textLength="524.6"
clip-path="url(#breeze-pr-auto-triage-line-84)">Number of concurrent LLM assessment calls. </tex
[...]
+</text><text class="breeze-pr-auto-triage-r5" x="0" y="2069.6"
textLength="12.2" clip-path="url(#breeze-pr-auto-triage-line-84)">│</text><text
class="breeze-pr-auto-triage-r4" x="24.4" y="2069.6" textLength="207.4"
clip-path="url(#breeze-pr-auto-triage-line-84)">--llm-concurrency</text><text
class="breeze-pr-auto-triage-r1" x="256.2" y="2069.6" textLength="524.6"
clip-path="url(#breeze-pr-auto-triage-line-84)">Number of concurrent LLM assessment calls. </tex
[...]
</text><text class="breeze-pr-auto-triage-r5" x="0" y="2094" textLength="12.2"
clip-path="url(#breeze-pr-auto-triage-line-85)">│</text><text
class="breeze-pr-auto-triage-r4" x="24.4" y="2094" textLength="207.4"
clip-path="url(#breeze-pr-auto-triage-line-85)">--clear-llm-cache</text><text
class="breeze-pr-auto-triage-r1" x="256.2" y="2094" textLength="658.8"
clip-path="url(#breeze-pr-auto-triage-line-85)">Clear the LLM review and triage caches before
[...]
</text><text class="breeze-pr-auto-triage-r5" x="0" y="2118.4"
textLength="1464"
clip-path="url(#breeze-pr-auto-triage-line-86)">╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯</text><text
class="breeze-pr-auto-triage-r1" x="1464" y="2118.4" textLength="12.2"
clip-path="url(#breeze-pr-auto-triage-line-86)">
</text><text class="breeze-pr-auto-triage-r5" x="0" y="2142.8"
textLength="24.4"
clip-path="url(#breeze-pr-auto-triage-line-87)">╭─</text><text
class="breeze-pr-auto-triage-r5" x="24.4" y="2142.8" textLength="195.2"
clip-path="url(#breeze-pr-auto-triage-line-87)"> Action options </text><text
class="breeze-pr-auto-triage-r5" x="219.6" y="2142.8" textLength="1220"
clip-path="url(#breeze-pr-auto-triage-line-87)">────────────────────────────────────────────────────────────────
[...]
diff --git a/dev/breeze/doc/images/output_pr_auto-triage.txt
b/dev/breeze/doc/images/output_pr_auto-triage.txt
index dc56649fa28..66cedf08b8d 100644
--- a/dev/breeze/doc/images/output_pr_auto-triage.txt
+++ b/dev/breeze/doc/images/output_pr_auto-triage.txt
@@ -1 +1 @@
-81f90099329d48933ff6824ffbf3e2b9
+8107cecf84483d0900f542688f0d0247
diff --git a/dev/breeze/src/airflow_breeze/commands/pr_commands.py
b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
index efd6fa0fdd6..e49706c5865 100644
--- a/dev/breeze/src/airflow_breeze/commands/pr_commands.py
+++ b/dev/breeze/src/airflow_breeze/commands/pr_commands.py
@@ -142,6 +142,45 @@ def _save_assessment_cache(github_repository: str,
pr_number: int, head_sha: str
cache_file.write_text(json.dumps({"head_sha": head_sha, "assessment":
assessment}, indent=2))
+def _get_log_cache_dir(github_repository: str) -> Path:
+ """Return the directory for storing cached CI log snippets."""
+ from airflow_breeze.utils.path_utils import BUILD_CACHE_PATH
+
+ safe_name = github_repository.replace("/", "_")
+ cache_dir = Path(BUILD_CACHE_PATH) / "log_cache" / safe_name
+ cache_dir.mkdir(parents=True, exist_ok=True)
+ return cache_dir
+
+
+def _get_cached_log_snippets(
+ github_repository: str, head_sha: str, failed_check_names: list[str]
+) -> dict[str, dict] | None:
+ """Load cached CI log snippets if they exist and match the commit hash and
check names."""
+ cache_file = _get_log_cache_dir(github_repository) / f"sha_{head_sha}.json"
+ if not cache_file.exists():
+ return None
+ try:
+ data = json.loads(cache_file.read_text())
+ if data.get("head_sha") != head_sha:
+ return None
+ cached_snippets = data.get("snippets", {})
+ # Only return cache if it covers all requested check names
+ if all(name in cached_snippets for name in failed_check_names):
+ return cached_snippets
+ return None
+ except (json.JSONDecodeError, KeyError, OSError):
+ return None
+
+
+def _save_log_cache(github_repository: str, head_sha: str, snippets: dict[str,
Any]) -> None:
+ """Save CI log snippets to cache keyed by commit SHA."""
+ cache_file = _get_log_cache_dir(github_repository) / f"sha_{head_sha}.json"
+ serializable = {
+ name: {"snippet": info.snippet, "job_url": info.job_url} for name,
info in snippets.items()
+ }
+ cache_file.write_text(json.dumps({"head_sha": head_sha, "snippets":
serializable}, indent=2))
+
+
_STATUS_CACHE_TTL_SECONDS = 4 * 3600 # 4 hours
@@ -208,15 +247,62 @@ def _cached_fetch_recent_pr_failures(
def _cached_fetch_main_canary_builds(
token: str, github_repository: str, *, branch: str = "main", count: int = 4
) -> list[dict]:
- """Return cached canary build data, fetching fresh data when the cache
expires."""
+ """Return cached canary build data with failed jobs pre-fetched.
+
+ The cache stores builds together with their failed jobs so that
+ ``_display_canary_builds_status`` can render instantly without
+ additional API calls.
+ """
cache_key = f"canary_builds_{branch}"
cached = _get_cached_status(github_repository, cache_key)
if cached is not None:
- get_console().print("[dim]Using cached canary build data (expires
after 4 h).[/]")
- return cached
- result = _fetch_main_canary_builds(token, github_repository,
branch=branch, count=count)
- _save_status_cache(github_repository, cache_key, result)
- return result
+ # Verify cached builds have failed-jobs data; if any failed build is
+ # missing the key, invalidate the cache and re-fetch.
+ needs_refetch = any(b.get("conclusion") == "failure" and
"_failed_jobs" not in b for b in cached)
+ if not needs_refetch:
+ get_console().print("[dim]Using cached canary build data (expires
after 4 h).[/]")
+ return cached
+ get_console().print("[dim]Cached canary builds missing failed-job
details, re-fetching...[/]")
+
+ get_console().print("[info]Fetching canary builds from GitHub...[/]")
+ builds = _fetch_main_canary_builds(token, github_repository,
branch=branch, count=count)
+
+ # Pre-fetch failed jobs for all failed builds in parallel so they are
+ # available in the cache and don't block display later.
+ failed_builds = [b for b in builds if b.get("conclusion") == "failure" and
b.get("id")]
+ if failed_builds:
+ from concurrent.futures import ThreadPoolExecutor, as_completed
+
+ get_console().print(
+ f"[info]Fetching failed jobs for {len(failed_builds)} "
+ f"failed {'builds' if len(failed_builds) != 1 else 'build'}...[/]"
+ )
+ with ThreadPoolExecutor(max_workers=min(len(failed_builds), 4)) as
executor:
+ futures = {
+ executor.submit(_fetch_failed_jobs_for_run, token,
github_repository, b["id"]): b["id"]
+ for b in failed_builds
+ }
+ jobs_by_run: dict[int, list[dict]] = {}
+ done_count = 0
+ for future in as_completed(futures):
+ run_id = futures[future]
+ done_count += 1
+ try:
+ jobs_by_run[run_id] = future.result()
+ except Exception:
+ jobs_by_run[run_id] = []
+ get_console().print(
+ f" [dim]({done_count}/{len(failed_builds)}) fetched jobs
for run {run_id}[/]"
+ )
+
+ # Embed the failed-jobs list directly in each build dict so the
+ # display function can use it without making extra API calls.
+ for build in builds:
+ if build.get("id") in jobs_by_run:
+ build["_failed_jobs"] = jobs_by_run[build["id"]]
+
+ _save_status_cache(github_repository, cache_key, builds)
+ return builds
def _cached_assess_pr(
@@ -629,6 +715,19 @@ class StaleReviewInfo:
author_pinged_reviewer: bool # whether the author mentioned the reviewer
after the review
+@dataclass
+class BatchPrefetchResult:
+ """Result from a background prefetch of the next page of PRs."""
+
+ all_prs: list[PRData]
+ has_next_page: bool
+ next_cursor: str | None
+ candidate_prs: list[PRData]
+ accepted_prs: list[PRData]
+ triaged_classification: dict[str, set[int]]
+ reclassified_count: int
+
+
@dataclass
class ReviewComment:
"""A single line-level review comment proposed by the LLM."""
@@ -2608,8 +2707,8 @@ def _launch_background_log_fetching(
pr_word = "PRs" if len(prs_with_failures) != 1 else "PR"
get_console().print(
- f"[info]Launched CI log fetching for {len(prs_with_failures)}
{pr_word} "
- f"with failures in background (concurrency: {llm_concurrency}).[/]"
+ f"[info]Downloading CI failure logs for {len(prs_with_failures)}
{pr_word} "
+ f"in background (concurrency: {llm_concurrency}).[/]"
)
return log_futures
@@ -3055,34 +3154,18 @@ def _prompt_and_execute_flagged_pr(
if log_snippets:
_display_log_snippets_panel(log_snippets, pr=pr)
else:
- # Logs are still being fetched — offer user to wait or cancel
- from airflow_breeze.utils.confirm import _read_char
-
- get_console().print(" [dim]CI failure logs are still being
fetched in the background...[/]")
- get_console().print(
- " Press any key to [bold]wait[/] or [bold]\\[c]ancel[/]
log retrieval for this PR: ",
- end="",
- )
+ # Logs are still being fetched — wait automatically
+ get_console().print(" [dim]Downloading CI failure logs...[/]")
try:
- ch = _read_char()
- except (KeyboardInterrupt, EOFError):
- ch = "c"
- get_console().print(ch if len(ch) == 1 else "")
-
- if ch.lower() != "c":
- get_console().print(" [dim]Waiting for CI logs...[/]")
- try:
- log_snippets = log_future.result(timeout=120)
- if log_snippets:
- _display_log_snippets_panel(log_snippets, pr=pr)
- except TimeoutError:
- get_console().print(" [warning]CI log retrieval timed
out.[/]")
- except Exception:
- get_console().print(" [warning]CI log retrieval
failed.[/]")
- else:
- get_console().print(" [dim]Skipping CI log display for
this PR.[/]")
+ log_snippets = log_future.result(timeout=120)
+ if log_snippets:
+ _display_log_snippets_panel(log_snippets, pr=pr)
+ except TimeoutError:
+ get_console().print(" [warning]CI log retrieval timed
out.[/]")
+ except Exception:
+ get_console().print(" [warning]CI log retrieval
failed.[/]")
else:
- # No background future — fetch inline as fallback
+ # No background future — fetch inline as fallback (progress shown
by the function)
log_snippets = _fetch_failed_job_log_snippets(
ctx.token, ctx.github_repository, pr.head_sha, pr.failed_checks
)
@@ -3396,42 +3479,178 @@ def _enrich_candidate_details(
if not candidate_prs:
return
- console_print(
- f"[info]Fetching check details for {len(candidate_prs)} "
- f"{'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
- )
+ n = len(candidate_prs)
+ pr_word = "PRs" if n != 1 else "PR"
+ total_steps = 2 + (1 if run_api else 0)
+ step = 0
+
+ step += 1
+ t_step = time.monotonic()
+ console_print(f" [info][{step}/{total_steps}] Fetching check details for
{n} candidate {pr_word}...[/]")
_fetch_check_details_batch(token, github_repository, candidate_prs)
for pr in candidate_prs:
if pr.checks_state == "FAILURE" and not pr.failed_checks and
pr.head_sha:
console_print(
- f" [dim]Fetching full check details for PR {_pr_link(pr)} "
+ f" [dim]Fetching full check details for PR {_pr_link(pr)} "
f"(failures beyond first 100 checks)...[/]"
)
pr.failed_checks = _fetch_failed_checks(token, github_repository,
pr.head_sha)
+ console_print(f" [dim]done ({_fmt_duration(time.monotonic() -
t_step)})[/]")
+ step += 1
+ t_step = time.monotonic()
unknown_count = sum(1 for pr in candidate_prs if pr.mergeable == "UNKNOWN")
if unknown_count:
console_print(
- f"[info]Resolving merge conflict status for {unknown_count} "
- f"{'PRs' if unknown_count != 1 else 'PR'} with unknown
status...[/]"
+ f" [info][{step}/{total_steps}] Resolving merge conflict status "
+ f"for {unknown_count} {pr_word}...[/]"
)
resolved = _resolve_unknown_mergeable(token, github_repository,
candidate_prs)
remaining = unknown_count - resolved
if remaining:
console_print(
- f" [dim]{resolved} resolved, {remaining} still unknown "
- f"(GitHub hasn't computed mergeability yet).[/]"
+ f" [dim]{resolved} resolved, {remaining} still unknown "
+ f"({_fmt_duration(time.monotonic() - t_step)})[/]"
)
else:
- console_print(f" [dim]All {resolved} resolved.[/]")
+ console_print(f" [dim]All {resolved} resolved
({_fmt_duration(time.monotonic() - t_step)})[/]")
+ else:
+ console_print(f" [info][{step}/{total_steps}] Merge conflict status:
all known (skip)[/]")
if run_api:
+ step += 1
+ t_step = time.monotonic()
console_print(
- f"[info]Fetching review thread details for {len(candidate_prs)} "
- f"{'PRs' if len(candidate_prs) != 1 else 'PR'}...[/]"
+ f" [info][{step}/{total_steps}] Fetching review thread details
for {n} candidate {pr_word}...[/]"
)
_fetch_unresolved_comments_batch(token, github_repository,
candidate_prs)
+ console_print(f" [dim]done ({_fmt_duration(time.monotonic() -
t_step)})[/]")
+
+
+def _prefetch_next_batch(
+ *,
+ token: str,
+ github_repository: str,
+ exact_labels: tuple[str, ...],
+ exact_exclude_labels: tuple[str, ...],
+ filter_user: str | None,
+ sort: str,
+ batch_size: int,
+ created_after: str | None,
+ created_before: str | None,
+ updated_after: str | None,
+ updated_before: str | None,
+ review_requested_user: str | None,
+ next_cursor: str | None,
+ wildcard_labels: list[str],
+ wildcard_exclude_labels: list[str],
+ include_collaborators: bool,
+ include_drafts: bool,
+ checks_state: str,
+ min_commits_behind: int,
+ max_num: int,
+ viewer_login: str,
+) -> BatchPrefetchResult | None:
+ """Prefetch and prepare the next page of PRs in a background thread.
+
+ Performs GraphQL fetch, wildcard filtering, commits-behind resolution,
+ mergeable status resolution, NOT_RUN reclassification, candidate filtering,
+ and triage classification — everything up to the point where interactive
+ review begins.
+
+ Returns None if no PRs are found.
+ """
+ from fnmatch import fnmatch
+
+ all_prs, has_next_page, new_cursor = _fetch_prs_graphql(
+ token,
+ github_repository,
+ labels=exact_labels,
+ exclude_labels=exact_exclude_labels,
+ filter_user=filter_user,
+ sort=sort,
+ batch_size=batch_size,
+ created_after=created_after,
+ created_before=created_before,
+ updated_after=updated_after,
+ updated_before=updated_before,
+ review_requested=review_requested_user,
+ after_cursor=next_cursor,
+ )
+ if not all_prs:
+ return None
+
+ # Apply wildcard label filters client-side
+ if wildcard_labels:
+ all_prs = [
+ pr for pr in all_prs if any(fnmatch(lbl, pat) for pat in
wildcard_labels for lbl in pr.labels)
+ ]
+ if wildcard_exclude_labels:
+ all_prs = [
+ pr
+ for pr in all_prs
+ if not any(fnmatch(lbl, pat) for pat in wildcard_exclude_labels
for lbl in pr.labels)
+ ]
+
+ if not all_prs:
+ return None
+
+ # Enrich: commits behind
+ behind_map = _fetch_commits_behind_batch(token, github_repository, all_prs)
+ for pr in all_prs:
+ pr.commits_behind = behind_map.get(pr.number, 0)
+
+ # Resolve unknown mergeable status
+ unknown_count = sum(1 for pr in all_prs if pr.mergeable == "UNKNOWN")
+ if unknown_count:
+ _resolve_unknown_mergeable(token, github_repository, all_prs)
+
+ # Detect NOT_RUN reclassification
+ non_collab_success = [
+ pr
+ for pr in all_prs
+ if pr.checks_state == "SUCCESS"
+ and pr.author_association not in _COLLABORATOR_ASSOCIATIONS
+ and not _is_bot_account(pr.author_login)
+ ]
+ reclassified_count = 0
+ if non_collab_success:
+ _fetch_check_details_batch(token, github_repository,
non_collab_success)
+ reclassified_count = sum(1 for pr in non_collab_success if
pr.checks_state == "NOT_RUN")
+
+ # Filter candidates
+ candidate_prs, accepted_prs, _, _, _ = _filter_candidate_prs(
+ all_prs,
+ include_collaborators=include_collaborators,
+ include_drafts=include_drafts,
+ checks_state=checks_state,
+ min_commits_behind=min_commits_behind,
+ max_num=max_num,
+ )
+
+ # Classify already triaged
+ triaged_classification = _classify_already_triaged_prs(
+ token, github_repository, candidate_prs, viewer_login
+ )
+
+ return BatchPrefetchResult(
+ all_prs=all_prs,
+ has_next_page=has_next_page,
+ next_cursor=new_cursor,
+ candidate_prs=candidate_prs,
+ accepted_prs=accepted_prs,
+ triaged_classification=triaged_classification,
+ reclassified_count=reclassified_count,
+ )
+
+
+def _start_next_batch_prefetch(
+ executor: ThreadPoolExecutor,
+ **kwargs,
+) -> Future[BatchPrefetchResult | None]:
+ """Submit a background prefetch of the next batch to the given executor."""
+ return executor.submit(_prefetch_next_batch, **kwargs)
def _review_workflow_approval_prs(ctx: TriageContext, pending_approval:
list[PRData]) -> None:
@@ -5299,7 +5518,22 @@ def _fetch_failed_job_log_snippets(
Returns a dict mapping failed check name -> LogSnippetInfo (snippet + job
URL).
Only fetches logs for checks in ``failed_check_names`` to limit API calls.
+ Results are cached by commit SHA so repeated runs skip the download.
"""
+ # Check cache first
+ cached = _get_cached_log_snippets(github_repository, head_sha,
failed_check_names)
+ if cached is not None:
+ get_console().print(f"[dim]Using cached CI logs for {head_sha[:8]}
({len(cached)} checks).[/]")
+ return {
+ name: LogSnippetInfo(snippet=info["snippet"],
job_url=info["job_url"])
+ for name, info in cached.items()
+ }
+
+ check_word = "check" if len(failed_check_names) == 1 else "checks"
+ get_console().print(
+ f"[info]Downloading CI failure logs for {head_sha[:8]} "
+ f"({len(failed_check_names)} failed {check_word})...[/]"
+ )
import io
import zipfile
@@ -5402,6 +5636,10 @@ def _fetch_failed_job_log_snippets(
if all(name in snippets for name in failed_check_names):
break
+ # Cache results for future runs with the same commit
+ if snippets:
+ _save_log_cache(github_repository, head_sha, snippets)
+
return snippets
@@ -5629,6 +5867,21 @@ def _fetch_main_canary_builds(
return [r for r in runs if r.get("name") == "Tests"][:count]
+def _platform_from_name(name: str) -> str:
+ """Determine platform (ARM/AMD) from the job name.
+
+ Airflow CI job names typically contain 'ARM' or 'AMD' as a segment,
+ e.g. ``Tests / AMD Python 3.9 / ...`` or ``Tests / ARM Python 3.9 / ...``.
+ Falls back to ``AMD`` when the name does not contain a clear indicator.
+ """
+ upper = name.upper()
+ if "ARM" in upper or "AARCH64" in upper:
+ return "ARM"
+ if "AMD" in upper or "X86" in upper or "X64" in upper:
+ return "AMD"
+ return ""
+
+
def _platform_from_labels(labels: list[str]) -> str:
"""Determine platform (ARM/AMD) from GitHub Actions job runner labels."""
for label in labels:
@@ -5667,18 +5920,25 @@ def _fetch_failed_jobs_for_run(token: str,
github_repository: str, run_id: int)
failed = []
for job in all_jobs:
if job.get("conclusion") == "failure":
+ job_name = job.get("name", "unknown")
+ # Prefer platform detection from job name; fall back to runner
labels
+ platform = _platform_from_name(job_name) or
_platform_from_labels(job.get("labels") or [])
failed.append(
{
- "name": job.get("name", "unknown"),
- "platform": _platform_from_labels(job.get("labels") or []),
+ "name": job_name,
+ "platform": platform,
"html_url": job.get("html_url", ""),
}
)
return failed
-def _display_canary_builds_status(builds: list[dict], token: str,
github_repository: str) -> None:
- """Display a Rich table showing the status of recent scheduled Tests
builds."""
+def _display_canary_builds_status(builds: list[dict]) -> None:
+ """Display a Rich table showing the status of recent scheduled Tests
builds.
+
+ Failed jobs are expected to be pre-fetched and embedded in each build dict
+ under the ``_failed_jobs`` key by ``_cached_fetch_main_canary_builds``.
+ """
from rich.table import Table
console = get_console()
@@ -5690,6 +5950,7 @@ def _display_canary_builds_status(builds: list[dict],
token: str, github_reposit
table = Table(title="Main Branch Tests Builds (scheduled)", expand=False)
table.add_column("Status", justify="center")
table.add_column("Started", justify="right")
+ table.add_column("Arch", justify="center")
table.add_column("Failed Jobs", style="red")
table.add_column("Link", style="dim")
@@ -5723,17 +5984,22 @@ def _display_canary_builds_status(builds: list[dict],
token: str, github_reposit
# Clickable link to the workflow run page
link = f"[link={html_url}]checks[/link]" if html_url else str(run_id)
- # Fetch failed jobs for failed builds
+ # Use pre-fetched failed jobs (embedded by
_cached_fetch_main_canary_builds)
failed_jobs_display = ""
- if conclusion == "failure" and run_id:
- failed_jobs = _fetch_failed_jobs_for_run(token, github_repository,
run_id)
- if failed_jobs:
- parts = []
- for fj in failed_jobs:
- parts.append(f"{fj['name']} ({fj['platform']})")
- failed_jobs_display = "\n".join(parts)
-
- table.add_row(status_display, age, failed_jobs_display, link)
+ failed_jobs = build.get("_failed_jobs", [])
+ # Determine unique architectures from failed jobs
+ archs: set[str] = set()
+ if failed_jobs:
+ parts = []
+ for fj in failed_jobs:
+ platform = fj.get("platform", "")
+ if platform:
+ archs.add(platform)
+ parts.append(fj["name"])
+ failed_jobs_display = "\n".join(parts)
+ arch_display = ", ".join(sorted(archs)) if archs else ""
+
+ table.add_row(status_display, age, arch_display, failed_jobs_display,
link)
console.print(table)
console.print()
@@ -6051,7 +6317,7 @@ def _display_recent_pr_failure_panel(
@click.option(
"--llm-concurrency",
type=int,
- default=4,
+ default=8,
show_default=True,
help="Number of concurrent LLM assessment calls.",
)
@@ -6131,6 +6397,7 @@ def auto_triage(
("review", _get_review_cache_dir),
("triage", _get_triage_cache_dir),
("status", _get_status_cache_dir),
+ ("log", _get_log_cache_dir),
]:
cache_dir = get_dir(github_repository)
if cache_dir.exists():
@@ -6174,12 +6441,17 @@ def auto_triage(
# Refresh collaborators cache in the background on every run
_refresh_collaborators_cache_in_background(token, github_repository)
- # Preload main branch CI failure information (cached for 4 hours)
- main_failures = _cached_fetch_recent_pr_failures(token, github_repository)
-
- # Show status of recent scheduled (canary) builds on main branch (cached
for 4 hours)
- canary_builds = _cached_fetch_main_canary_builds(token, github_repository)
- _display_canary_builds_status(canary_builds, token, github_repository)
+ # Preload main branch CI failure information and canary builds in parallel
(both cached for 4 hours)
+ with ThreadPoolExecutor(max_workers=2) as startup_executor:
+ main_failures_future = startup_executor.submit(
+ _cached_fetch_recent_pr_failures, token, github_repository
+ )
+ canary_builds_future = startup_executor.submit(
+ _cached_fetch_main_canary_builds, token, github_repository
+ )
+ main_failures = main_failures_future.result()
+ canary_builds = canary_builds_future.result()
+ _display_canary_builds_status(canary_builds)
# Resolve review-requested filter: --reviews-for-me uses authenticated
user, --reviews-for uses specified users
review_requested_user: str | None = None
@@ -6209,16 +6481,25 @@ def auto_triage(
t_total_start = time.monotonic()
- # Phase 1: Lightweight fetch of PRs via GraphQL (no check contexts — fast)
+ # Phase 1: Fetch and prepare PRs
+ console.print("\n[bold]Phase 1: Fetching and preparing PRs[/bold]")
t_phase1_start = time.monotonic()
has_next_page = False
next_cursor: str | None = None
+ step_num = 0
+
+ # Step 1: Fetch PRs via GraphQL
+ step_num += 1
+ t_step = time.monotonic()
if pr_number:
- console_print(f"[info]Fetching PR #{pr_number} via GraphQL...[/]")
+ console_print(f" [info][{step_num}/7] Fetching PR #{pr_number} via
GraphQL...[/]")
all_prs = [_fetch_single_pr_graphql(token, github_repository,
pr_number)]
elif len(review_requested_users) > 1:
# Multiple reviewers: fetch PRs for each reviewer and merge
(deduplicate)
- console_print("[info]Fetching PRs via GraphQL for multiple
reviewers...[/]")
+ console_print(
+ f" [info][{step_num}/7] Fetching PRs via GraphQL "
+ f"for {len(review_requested_users)} reviewers...[/]"
+ )
seen_numbers: set[int] = set()
all_prs = []
for reviewer in review_requested_users:
@@ -6243,7 +6524,7 @@ def auto_triage(
# Disable pagination for multi-reviewer queries
has_next_page = False
else:
- console_print("[info]Fetching PRs via GraphQL...[/]")
+ console_print(f" [info][{step_num}/7] Fetching PRs via GraphQL...[/]")
all_prs, has_next_page, next_cursor = _fetch_prs_graphql(
token,
github_repository,
@@ -6258,6 +6539,10 @@ def auto_triage(
updated_before=updated_before,
review_requested=review_requested_user,
)
+ console_print(
+ f" [dim]{len(all_prs)} PRs fetched"
+ f"{' (more pages available)' if has_next_page else ''}
({_fmt_duration(time.monotonic() - t_step)})[/]"
+ )
# Apply wildcard label filters client-side
if wildcard_labels:
@@ -6301,36 +6586,48 @@ def auto_triage(
reviewed_by_prs.add(pr.number)
if reviewed_by_prs:
console.print(
- f"[info]Also found {len(reviewed_by_prs)} "
+ f" [dim]Also found {len(reviewed_by_prs)} "
f"{'PRs' if len(reviewed_by_prs) != 1 else 'PR'} "
f"previously reviewed by {',
'.join(review_requested_users)}.[/]"
)
- # Resolve how far behind base branch each PR is
- console_print("[info]Checking how far behind base branch each PR is...[/]")
+ # Step 2: Resolve how far behind base branch each PR is
+ step_num += 1
+ t_step = time.monotonic()
+ console_print(f" [info][{step_num}/7] Checking how far behind base branch
each PR is...[/]")
behind_map = _fetch_commits_behind_batch(token, github_repository, all_prs)
for pr in all_prs:
pr.commits_behind = behind_map.get(pr.number, 0)
+ max_behind = max(behind_map.values()) if behind_map else 0
+ console_print(
+ f" [dim]done (max {max_behind} commits behind)
({_fmt_duration(time.monotonic() - t_step)})[/]"
+ )
- # Resolve UNKNOWN mergeable status before displaying the overview table
+ # Step 3: Resolve UNKNOWN mergeable status before displaying the overview
table
+ step_num += 1
+ t_step = time.monotonic()
unknown_count = sum(1 for pr in all_prs if pr.mergeable == "UNKNOWN")
if unknown_count:
console_print(
- f"[info]Resolving merge conflict status for {unknown_count} "
- f"{'PRs' if unknown_count != 1 else 'PR'} with unknown
status...[/]"
+ f" [info][{step_num}/7] Resolving merge conflict status "
+ f"for {unknown_count} {'PRs' if unknown_count != 1 else
'PR'}...[/]"
)
resolved = _resolve_unknown_mergeable(token, github_repository,
all_prs)
remaining = unknown_count - resolved
if remaining:
console_print(
- f" [dim]{resolved} resolved, {remaining} still unknown "
- f"(GitHub hasn't computed mergeability yet).[/]"
+ f" [dim]{resolved} resolved, {remaining} still unknown "
+ f"({_fmt_duration(time.monotonic() - t_step)})[/]"
)
else:
- console_print(f" [dim]All {resolved} resolved.[/]")
+ console_print(f" [dim]All {resolved} resolved
({_fmt_duration(time.monotonic() - t_step)})[/]")
+ else:
+ console_print(f" [info][{step_num}/7] Merge conflict status: all
known (skip)[/]")
- # Detect PRs whose rollup state is SUCCESS but only have bot/labeler
checks (no real CI).
+ # Step 4: Detect PRs whose rollup state is SUCCESS but only have
bot/labeler checks (no real CI).
# These need to be reclassified as NOT_RUN so they get routed to workflow
approval.
+ step_num += 1
+ t_step = time.monotonic()
non_collab_success = [
pr
for pr in all_prs
@@ -6340,19 +6637,23 @@ def auto_triage(
]
if non_collab_success:
console_print(
- f"[info]Verifying CI status for {len(non_collab_success)} "
- f"{'PRs' if len(non_collab_success) != 1 else 'PR'} "
- f"showing SUCCESS (checking for real test checks)...[/]"
+ f" [info][{step_num}/7] Verifying CI status for
{len(non_collab_success)} "
+ f"{'PRs' if len(non_collab_success) != 1 else 'PR'} showing
SUCCESS...[/]"
)
_fetch_check_details_batch(token, github_repository,
non_collab_success)
reclassified = sum(1 for pr in non_collab_success if pr.checks_state
== "NOT_RUN")
if reclassified:
console_print(
- f" [warning]{reclassified} {'PRs' if reclassified != 1 else
'PR'} "
- f"reclassified to NOT_RUN (only bot/labeler checks, no real
CI).[/]"
+ f" [warning]{reclassified} reclassified to NOT_RUN "
+ f"(only bot/labeler checks) ({_fmt_duration(time.monotonic() -
t_step)})[/]"
)
+ else:
+ console_print(f" [dim]All verified
({_fmt_duration(time.monotonic() - t_step)})[/]")
+ else:
+ console_print(f" [info][{step_num}/7] CI status verification: none
needed (skip)[/]")
- # Filter candidates first
+ # Step 5: Filter candidates
+ step_num += 1
candidate_prs, accepted_prs, total_skipped_collaborator,
total_skipped_bot, total_skipped_accepted = (
_filter_candidate_prs(
all_prs,
@@ -6364,9 +6665,20 @@ def auto_triage(
also_accepted=reviewed_by_prs if review_mode else None,
)
)
+ console_print(
+ f" [info][{step_num}/7] Filtering candidates: "
+ f"{len(candidate_prs)} candidates, {len(accepted_prs)} accepted "
+ f"(skipped: {total_skipped_collaborator} collaborators, "
+ f"{total_skipped_bot} bots, {total_skipped_accepted} already
accepted)[/]"
+ )
- # Exclude PRs that already have a triage comment posted after the last
commit
- console_print("[info]Checking for PRs already triaged (no new commits
since last triage comment)...[/]")
+ # Step 6: Exclude PRs that already have a triage comment posted after the
last commit
+ step_num += 1
+ t_step = time.monotonic()
+ console_print(
+ f" [info][{step_num}/7] Checking for already-triaged PRs "
+ f"(no new commits since last triage comment)...[/]"
+ )
triaged_classification = _classify_already_triaged_prs(
token, github_repository, candidate_prs, viewer_login
)
@@ -6378,15 +6690,17 @@ def auto_triage(
already_triaged = [pr for pr in candidate_prs if pr.number in
already_triaged_nums]
candidate_prs = [pr for pr in candidate_prs if pr.number not in
already_triaged_nums]
console_print(
- f"[info]Skipped {len(already_triaged)} already-triaged "
- f"{'PRs' if len(already_triaged) != 1 else 'PR'} "
+ f" [dim]Skipped {len(already_triaged)} already-triaged "
f"({triaged_waiting_count} commented, "
- f"{triaged_responded_count} author responded).[/]"
+ f"{triaged_responded_count} author responded) "
+ f"({_fmt_duration(time.monotonic() - t_step)})[/]"
)
else:
- console_print(" [dim]None found.[/]")
+ console_print(f" [dim]None found ({_fmt_duration(time.monotonic() -
t_step)})[/]")
- # Display overview table (after triaged detection so we can mark
actionable PRs)
+ # Step 7: Display overview table
+ step_num += 1
+ console_print(f" [info][{step_num}/7] Displaying overview table[/]")
_display_pr_overview_table(
all_prs,
triaged_waiting_nums=triaged_classification["waiting"],
@@ -6394,6 +6708,10 @@ def auto_triage(
)
t_phase1_end = time.monotonic()
+ console.print(
+ f"[bold]Phase 1 complete:[/bold] {len(candidate_prs)} PRs to triage, "
+ f"{len(accepted_prs)} accepted ({_fmt_duration(t_phase1_end -
t_phase1_start)})\n"
+ )
# --- Review mode: early exit into review flow for accepted PRs ---
if review_mode:
@@ -6754,6 +7072,36 @@ def auto_triage(
log_futures=log_futures,
)
+ # Start prefetching next page in background while user reviews current
batch
+ prefetch_executor = ThreadPoolExecutor(max_workers=1) if has_next_page and
not pr_number else None
+ next_batch_future: Future[BatchPrefetchResult | None] | None = None
+ prefetch_kwargs = dict(
+ token=token,
+ github_repository=github_repository,
+ exact_labels=exact_labels,
+ exact_exclude_labels=exact_exclude_labels,
+ filter_user=filter_user,
+ sort=sort,
+ batch_size=batch_size,
+ created_after=created_after,
+ created_before=created_before,
+ updated_after=updated_after,
+ updated_before=updated_before,
+ review_requested_user=review_requested_user,
+ wildcard_labels=wildcard_labels,
+ wildcard_exclude_labels=wildcard_exclude_labels,
+ include_collaborators=include_collaborators,
+ include_drafts=include_drafts,
+ checks_state=checks_state,
+ min_commits_behind=min_commits_behind,
+ max_num=max_num,
+ viewer_login=viewer_login,
+ )
+ if prefetch_executor and has_next_page:
+ next_batch_future = _start_next_batch_prefetch(
+ prefetch_executor, next_cursor=next_cursor, **prefetch_kwargs
+ )
+
try:
# Phase 4b: Present NOT_RUN PRs for workflow approval (LLM runs in
background)
_review_workflow_approval_prs(ctx, pending_approval)
@@ -6782,97 +7130,62 @@ def auto_triage(
if llm_executor is not None:
llm_executor.shutdown(wait=False, cancel_futures=True)
- # Fetch and process next batch if available and user hasn't quit
- while has_next_page and not stats.quit_early and not pr_number:
+ # Process subsequent batches using prefetched data
+ while not stats.quit_early and not pr_number and next_batch_future is not
None:
batch_num = getattr(stats, "_batch_count", 1) + 1
stats._batch_count = batch_num # type: ignore[attr-defined]
- console_print(f"\n[info]Batch complete. Fetching next batch (page
{batch_num})...[/]\n")
- all_prs, has_next_page, next_cursor = _fetch_prs_graphql(
- token,
- github_repository,
- labels=exact_labels,
- exclude_labels=exact_exclude_labels,
- filter_user=filter_user,
- sort=sort,
- batch_size=batch_size,
- created_after=created_after,
- created_before=created_before,
- updated_after=updated_after,
- updated_before=updated_before,
- review_requested=review_requested_user,
- after_cursor=next_cursor,
- )
- if not all_prs:
+
+ # Wait for the prefetched result (should already be done or nearly
done)
+ t_wait_start = time.monotonic()
+ was_ready = next_batch_future.done()
+ prefetch_result = next_batch_future.result()
+ t_wait = time.monotonic() - t_wait_start
+ if was_ready:
+ console_print(f"\n[info]Batch {batch_num}: next page already
prefetched.[/]")
+ else:
+ console_print(
+ f"\n[info]Batch {batch_num}: waited {_fmt_duration(t_wait)} "
+ f"for background prefetch to complete.[/]"
+ )
+ next_batch_future = None
+
+ if prefetch_result is None:
console_print("[info]No more PRs to process.[/]")
break
- # Apply wildcard label filters client-side
- if wildcard_labels:
- all_prs = [
- pr for pr in all_prs if any(fnmatch(lbl, pat) for pat in
wildcard_labels for lbl in pr.labels)
- ]
- if wildcard_exclude_labels:
- all_prs = [
- pr
- for pr in all_prs
- if not any(fnmatch(lbl, pat) for pat in
wildcard_exclude_labels for lbl in pr.labels)
- ]
+ all_prs = prefetch_result.all_prs
+ has_next_page = prefetch_result.has_next_page
+ next_cursor = prefetch_result.next_cursor
+ candidate_prs = prefetch_result.candidate_prs
+ batch_accepted = prefetch_result.accepted_prs
+ accepted_prs.extend(batch_accepted)
- # Enrich: commits behind, mergeable status
- behind_map = _fetch_commits_behind_batch(token, github_repository,
all_prs)
- for pr in all_prs:
- pr.commits_behind = behind_map.get(pr.number, 0)
- unknown_count = sum(1 for pr in all_prs if pr.mergeable == "UNKNOWN")
- if unknown_count:
- _resolve_unknown_mergeable(token, github_repository, all_prs)
+ console_print(
+ f"[info]Batch {batch_num}: {len(all_prs)} PRs fetched, "
+ f"{len(candidate_prs)} candidates"
+ f"{' (more pages available)' if has_next_page else ''}"
+ f" (wait: {_fmt_duration(t_wait)})[/]"
+ )
- # Detect PRs whose rollup state is SUCCESS but only have bot/labeler
checks
- batch_non_collab_success = [
- pr
- for pr in all_prs
- if pr.checks_state == "SUCCESS"
- and pr.author_association not in _COLLABORATOR_ASSOCIATIONS
- and not _is_bot_account(pr.author_login)
- ]
- if batch_non_collab_success:
+ if prefetch_result.reclassified_count:
console_print(
- f"[info]Verifying CI status for
{len(batch_non_collab_success)} "
- f"{'PRs' if len(batch_non_collab_success) != 1 else 'PR'} "
- f"showing SUCCESS...[/]"
+ f" [warning]{prefetch_result.reclassified_count} "
+ f"{'PRs' if prefetch_result.reclassified_count != 1 else 'PR'}
"
+ f"reclassified to NOT_RUN (only bot/labeler checks).[/]"
)
- _fetch_check_details_batch(token, github_repository,
batch_non_collab_success)
- reclassified = sum(1 for pr in batch_non_collab_success if
pr.checks_state == "NOT_RUN")
- if reclassified:
- console_print(
- f" [warning]{reclassified} {'PRs' if reclassified != 1
else 'PR'} "
- f"reclassified to NOT_RUN (only bot/labeler checks).[/]"
- )
-
- (
- candidate_prs,
- batch_accepted,
- _,
- _,
- _,
- ) = _filter_candidate_prs(
- all_prs,
- include_collaborators=include_collaborators,
- include_drafts=include_drafts,
- checks_state=checks_state,
- min_commits_behind=min_commits_behind,
- max_num=max_num,
- )
- accepted_prs.extend(batch_accepted)
if not candidate_prs:
console_print("[info]No PRs to assess in this batch.[/]")
_display_pr_overview_table(all_prs)
+ # Start prefetching the next page if available
+ if has_next_page and prefetch_executor:
+ next_batch_future = _start_next_batch_prefetch(
+ prefetch_executor, next_cursor=next_cursor,
**prefetch_kwargs
+ )
continue
- # Check already-triaged
- batch_triaged_cls = _classify_already_triaged_prs(
- token, github_repository, candidate_prs, viewer_login
- )
+ # Apply triage classification from prefetch
+ batch_triaged_cls = prefetch_result.triaged_classification
batch_triaged_nums = batch_triaged_cls["waiting"] |
batch_triaged_cls["responded"]
if batch_triaged_nums:
candidate_prs = [pr for pr in candidate_prs if pr.number not in
batch_triaged_nums]
@@ -6885,6 +7198,11 @@ def auto_triage(
if not candidate_prs:
console_print("[info]All PRs in this batch already triaged.[/]")
+ # Start prefetching the next page if available
+ if has_next_page and prefetch_executor:
+ next_batch_future = _start_next_batch_prefetch(
+ prefetch_executor, next_cursor=next_cursor,
**prefetch_kwargs
+ )
continue
# Enrich and assess
@@ -7012,6 +7330,12 @@ def auto_triage(
log_futures=batch_log_futures,
)
+ # Start prefetching the NEXT page before entering interactive review
+ if has_next_page and prefetch_executor:
+ next_batch_future = _start_next_batch_prefetch(
+ prefetch_executor, next_cursor=next_cursor, **prefetch_kwargs
+ )
+
try:
_review_workflow_approval_prs(batch_ctx, batch_pending)
@@ -7033,6 +7357,13 @@ def auto_triage(
if batch_executor is not None:
batch_executor.shutdown(wait=False, cancel_futures=True)
+ # Clean up prefetch executor
+ if prefetch_executor is not None:
+ # Cancel any pending prefetch if user quit early
+ if next_batch_future is not None and not next_batch_future.done():
+ next_batch_future.cancel()
+ prefetch_executor.shutdown(wait=False, cancel_futures=True)
+
# Display summary
_display_triage_summary(
all_prs,
diff --git
a/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
b/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
index 22361949f3b..e20b8804f80 100644
---
a/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
+++
b/providers/common/ai/src/airflow/providers/common/ai/utils/sql_validation.py
@@ -81,7 +81,7 @@ def validate_sql(
raise SQLSafetyError(f"SQL parse error: {e}") from e
# sqlglot.parse can return [None] for empty input
- parsed: list[exp.Expression] = [s for s in statements if s is not None]
+ parsed: list[exp.Expression] = [s for s in statements if s is not None] #
type: ignore[misc]
if not parsed:
raise SQLSafetyError("Empty SQL input.")