choo121600 commented on code in PR #63319:
URL: https://github.com/apache/airflow/pull/63319#discussion_r2918111658


##########
dev/breeze/src/airflow_breeze/commands/pr_commands.py:
##########
@@ -1836,420 +2465,76 @@ def auto_triage(
         if pending_approval:
             skipped_detail += f", {len(pending_approval)} awaiting workflow 
approval"
         get_console().print(
-            f"\n[info]Running LLM assessment for {len(llm_candidates)} "
-            f"{'PRs' if len(llm_candidates) != 1 else 'PR'} (skipped 
{skipped_detail})...[/]\n"
+            f"\n[info]Starting LLM assessment for {len(llm_candidates)} "
+            f"{'PRs' if len(llm_candidates) != 1 else 'PR'} in background "
+            f"(skipped {skipped_detail})...[/]\n"
         )
-        with ThreadPoolExecutor(max_workers=llm_concurrency) as executor:
-            future_to_pr = {
-                executor.submit(
-                    assess_pr,
-                    pr_number=pr.number,
-                    pr_title=pr.title,
-                    pr_body=pr.body,
-                    check_status_summary=pr.check_summary,
-                    llm_model=llm_model,
-                ): pr
-                for pr in llm_candidates
-            }
-            for future in as_completed(future_to_pr):
-                pr = future_to_pr[future]
-                assessment = future.result()
-                if assessment.error:
-                    total_llm_errors += 1
-                    continue
-                if not assessment.should_flag:
-                    get_console().print(f"  [success]PR {_pr_link(pr)} passes 
quality check.[/]")
-                    passing_prs.append(pr)
-                    continue
-                assessments[pr.number] = assessment
-
-    total_flagged = len(assessments)
-    summary_parts = [
-        f"{total_deterministic_flags} CI/conflicts/comments",
-        f"{total_flagged - total_deterministic_flags} LLM-flagged",
-    ]
-    if pending_approval:
-        summary_parts.append(f"{len(pending_approval)} awaiting workflow 
approval")
-    if total_llm_errors:
-        summary_parts.append(f"{total_llm_errors} LLM errors")
-    get_console().print(
-        f"\n[info]Assessment complete: {total_flagged} {'PRs' if total_flagged 
!= 1 else 'PR'} "
-        f"flagged ({', '.join(summary_parts)}).[/]\n"
-    )
+        llm_executor = ThreadPoolExecutor(max_workers=llm_concurrency)
+        llm_future_to_pr = {
+            llm_executor.submit(
+                assess_pr,
+                pr_number=pr.number,
+                pr_title=pr.title,
+                pr_body=pr.body,
+                check_status_summary=pr.check_summary,
+                llm_model=llm_model,
+            ): pr
+            for pr in llm_candidates
+        }
 
-    # Phase 5: Present flagged PRs interactively, grouped by author
-    total_converted = 0
-    total_commented = 0
-    total_closed = 0
-    total_ready = 0
-    total_skipped_action = 0
-    quit_early = False
-
-    # Build sorted list of flagged PRs grouped by author
-    flagged_prs = [(pr, assessments[pr.number]) for pr in candidate_prs if 
pr.number in assessments]
-    flagged_prs.sort(key=lambda pair: (pair[0].author_login.lower(), 
pair[0].number))
+    # Build shared triage context and stats
     from collections import Counter
 
-    author_flagged_count: dict[str, int] = dict(Counter(pr.author_login for 
pr, _ in flagged_prs))
-
-    current_author: str | None = None
-    for pr, assessment in flagged_prs:
-        if pr.author_login != current_author:
-            current_author = pr.author_login
-            count = author_flagged_count[current_author]
-            get_console().print()
-            get_console().rule(
-                f"[bold]Author: {current_author}[/] ({count} flagged PR{'s' if 
count != 1 else ''})",
-                style="cyan",
-            )
-
-        # Fetch author profile for context (only for flagged PRs)
-        author_profile = _fetch_author_profile(token, pr.author_login, 
github_repository)
-
-        comment = _build_comment(
-            pr.author_login, assessment.violations, pr.number, 
pr.commits_behind, pr.base_ref
-        )
-        comment_only = _build_comment(
-            pr.author_login,
-            assessment.violations,
-            pr.number,
-            pr.commits_behind,
-            pr.base_ref,
-            comment_only=True,
-        )
-        close_comment = _build_close_comment(
-            pr.author_login,
-            assessment.violations,
-            pr.number,
-            author_flagged_count.get(pr.author_login, 0),
-        )
-        _display_pr_panel(pr, author_profile, assessment, comment)
-
-        default_action, reason = _compute_default_action(pr, assessment, 
author_flagged_count)
-        if default_action == TriageAction.CLOSE:
-            get_console().print(Panel(close_comment, title="Proposed close 
comment", border_style="red"))
-        get_console().print(f"  [bold]{reason}[/]")
-
-        if dry_run:
-            action_label = {
-                TriageAction.DRAFT: "draft",
-                TriageAction.COMMENT: "add comment",
-                TriageAction.CLOSE: "close",
-                TriageAction.READY: "ready",
-                TriageAction.SKIP: "skip",
-            }[default_action]
-            get_console().print(f"[warning]Dry run — would default to: 
{action_label}[/]")
-            continue
-
-        action = prompt_triage_action(
-            f"Action for PR {_pr_link(pr)}?",
-            default=default_action,
-            forced_answer=answer_triage,
-        )
-
-        if action == TriageAction.QUIT:
-            get_console().print("[warning]Quitting.[/]")
-            quit_early = True
-            break
-
-        if action == TriageAction.SKIP:
-            get_console().print(f"  [info]Skipping PR {_pr_link(pr)} — no 
action taken.[/]")
-            total_skipped_action += 1
-            continue
-
-        if action == TriageAction.READY:
-            get_console().print(
-                f"  [info]Marking PR {_pr_link(pr)} as ready — adding 
'{_READY_FOR_REVIEW_LABEL}' label.[/]"
-            )
-            if _add_label(token, github_repository, pr.node_id, 
_READY_FOR_REVIEW_LABEL):
-                get_console().print(
-                    f"  [success]Label '{_READY_FOR_REVIEW_LABEL}' added to PR 
{_pr_link(pr)}.[/]"
-                )
-                total_ready += 1
-            else:
-                get_console().print(f"  [warning]Failed to add label to PR 
{_pr_link(pr)}.[/]")
-            continue
-
-        if action == TriageAction.COMMENT:
-            get_console().print(f"  Posting comment on PR {_pr_link(pr)}...")
-            if _post_comment(token, pr.node_id, comment_only):
-                get_console().print(f"  [success]Comment posted on PR 
{_pr_link(pr)}.[/]")
-                total_commented += 1
-            else:
-                get_console().print(f"  [error]Failed to post comment on PR 
{_pr_link(pr)}.[/]")
-            continue
-
-        if action == TriageAction.DRAFT:
-            get_console().print(f"  Converting PR {_pr_link(pr)} to draft...")
-            if _convert_pr_to_draft(token, pr.node_id):
-                get_console().print(f"  [success]PR {_pr_link(pr)} converted 
to draft.[/]")
-            else:
-                get_console().print(f"  [error]Failed to convert PR 
{_pr_link(pr)} to draft.[/]")
-                continue
-
-            get_console().print(f"  Posting comment on PR {_pr_link(pr)}...")
-            if _post_comment(token, pr.node_id, comment):
-                get_console().print(f"  [success]Comment posted on PR 
{_pr_link(pr)}.[/]")
-                total_converted += 1
-            else:
-                get_console().print(f"  [error]Failed to post comment on PR 
{_pr_link(pr)}.[/]")
-            continue
-
-        if action == TriageAction.CLOSE:
-            get_console().print(f"  Closing PR {_pr_link(pr)}...")
-            if _close_pr(token, pr.node_id):
-                get_console().print(f"  [success]PR {_pr_link(pr)} closed.[/]")
-            else:
-                get_console().print(f"  [error]Failed to close PR 
{_pr_link(pr)}.[/]")
-                continue
-
-            if _add_label(token, github_repository, pr.node_id, 
_CLOSED_QUALITY_LABEL):
-                get_console().print(
-                    f"  [success]Label '{_CLOSED_QUALITY_LABEL}' added to PR 
{_pr_link(pr)}.[/]"
-                )
-            else:
-                get_console().print(f"  [warning]Failed to add label to PR 
{_pr_link(pr)}.[/]")
-
-            get_console().print(f"  Posting comment on PR {_pr_link(pr)}...")
-            if _post_comment(token, pr.node_id, close_comment):
-                get_console().print(f"  [success]Comment posted on PR 
{_pr_link(pr)}.[/]")
-                total_closed += 1
-            else:
-                get_console().print(f"  [error]Failed to post comment on PR 
{_pr_link(pr)}.[/]")
-
-    # Phase 5b: Present passing PRs for optional ready-for-review marking
-    if not quit_early and passing_prs:
-        passing_prs.sort(key=lambda p: (p.author_login.lower(), p.number))
-        get_console().print(
-            f"\n[info]{len(passing_prs)} {'PRs pass' if len(passing_prs) != 1 
else 'PR passes'} "
-            f"all checks — review to mark as ready:[/]\n"
-        )
-        for pr in passing_prs:
-            author_profile = _fetch_author_profile(token, pr.author_login, 
github_repository)
-            _display_pr_info_panels(pr, author_profile)
-
-            if dry_run:
-                get_console().print("[warning]Dry run — skipping.[/]")
-                continue
-
-            action = prompt_triage_action(
-                f"Action for PR {_pr_link(pr)}?",
-                default=TriageAction.SKIP,
-                forced_answer=answer_triage,
-            )
-
-            if action == TriageAction.QUIT:
-                get_console().print("[warning]Quitting.[/]")
-                quit_early = True
-                break
-
-            if action == TriageAction.READY:
-                get_console().print(
-                    f"  [info]Marking PR {_pr_link(pr)} as ready "
-                    f"— adding '{_READY_FOR_REVIEW_LABEL}' label.[/]"
-                )
-                if _add_label(token, github_repository, pr.node_id, 
_READY_FOR_REVIEW_LABEL):
-                    get_console().print(
-                        f"  [success]Label '{_READY_FOR_REVIEW_LABEL}' added 
to PR {_pr_link(pr)}.[/]"
-                    )
-                    total_ready += 1
-                else:
-                    get_console().print(f"  [warning]Failed to add label to PR 
{_pr_link(pr)}.[/]")
-            else:
-                get_console().print(f"  [info]Skipping PR {_pr_link(pr)} — no 
action taken.[/]")
-                total_skipped_action += 1
-
-    # Phase 6: Present NOT_RUN PRs for workflow approval
-    total_workflows_approved = 0
-    if not quit_early and pending_approval:
-        pending_approval.sort(key=lambda p: (p.author_login.lower(), p.number))
-        get_console().print(
-            f"\n[info]{len(pending_approval)} {'PRs have' if 
len(pending_approval) != 1 else 'PR has'} "
-            f"no test workflows run — review and approve workflow runs:[/]\n"
-        )
-        for pr in pending_approval:
-            author_profile = _fetch_author_profile(token, pr.author_login, 
github_repository)
-            pending_runs = _find_pending_workflow_runs(token, 
github_repository, pr.head_sha)
-            _display_workflow_approval_panel(pr, author_profile, pending_runs)
-
-            # If author exceeds the close threshold, suggest closing instead 
of approving
-            author_count = author_flagged_count.get(pr.author_login, 0)
-            if author_count > 3:
-                get_console().print(
-                    f"  [bold red]Author {pr.author_login} has {author_count} 
flagged "
-                    f"{'PRs' if author_count != 1 else 'PR'} "
-                    f"— suggesting close instead of workflow approval.[/]"
-                )
-                close_comment = _build_close_comment(pr.author_login, [], 
pr.number, author_count)
-                get_console().print(Panel(close_comment, title="Proposed close 
comment", border_style="red"))
-
-                if dry_run:
-                    get_console().print("[warning]Dry run — would default to: 
close[/]")
-                    continue
-
-                action = prompt_triage_action(
-                    f"Action for PR {_pr_link(pr)}?",
-                    default=TriageAction.CLOSE,
-                    forced_answer=answer_triage,
-                )
-                if action == TriageAction.QUIT:
-                    get_console().print("[warning]Quitting.[/]")
-                    quit_early = True
-                    break
-                if action == TriageAction.SKIP:
-                    get_console().print(f"  [info]Skipping PR {_pr_link(pr)} — 
no action taken.[/]")
-                    continue
-                if action == TriageAction.CLOSE:
-                    get_console().print(f"  Closing PR {_pr_link(pr)}...")
-                    if _close_pr(token, pr.node_id):
-                        get_console().print(f"  [success]PR {_pr_link(pr)} 
closed.[/]")
-                    else:
-                        get_console().print(f"  [error]Failed to close PR 
{_pr_link(pr)}.[/]")
-                        continue
-                    if _add_label(token, github_repository, pr.node_id, 
_CLOSED_QUALITY_LABEL):
-                        get_console().print(
-                            f"  [success]Label '{_CLOSED_QUALITY_LABEL}' added 
to PR {_pr_link(pr)}.[/]"
-                        )
-                    else:
-                        get_console().print(f"  [warning]Failed to add label 
to PR {_pr_link(pr)}.[/]")
-                    get_console().print(f"  Posting comment on PR 
{_pr_link(pr)}...")
-                    if _post_comment(token, pr.node_id, close_comment):
-                        get_console().print(f"  [success]Comment posted on PR 
{_pr_link(pr)}.[/]")
-                        total_closed += 1
-                    else:
-                        get_console().print(f"  [error]Failed to post comment 
on PR {_pr_link(pr)}.[/]")
-                    continue
-                # For DRAFT or READY, fall through to normal workflow approval
-                # (approve workflows first, then triage later)
-
-            if dry_run:
-                get_console().print("[warning]Dry run — skipping workflow 
approval.[/]")
-                continue
-
-            if not pending_runs:
-                get_console().print(
-                    f"  [dim]No pending workflow runs found for PR 
{_pr_link(pr)}. "
-                    f"Workflows may need to be triggered manually.[/]"
-                )
-                continue
-
-            answer = user_confirm(
-                f"Review diff for PR {_pr_link(pr)} before approving 
workflows?",
-                forced_answer=answer_triage,
-            )
-            if answer == Answer.QUIT:
-                get_console().print("[warning]Quitting.[/]")
-                quit_early = True
-                break
-            if answer == Answer.NO:
-                get_console().print(f"  [info]Skipping workflow approval for 
PR {_pr_link(pr)}.[/]")
-                continue
-
-            get_console().print(f"  Fetching diff for PR {_pr_link(pr)}...")
-            diff_text = _fetch_pr_diff(token, github_repository, pr.number)
-            if diff_text:
-                from rich.syntax import Syntax
-
-                get_console().print(
-                    Panel(
-                        Syntax(diff_text, "diff", theme="monokai", 
word_wrap=True),
-                        title=f"Diff for PR {_pr_link(pr)}",
-                        border_style="bright_cyan",
-                    )
-                )
-            else:
-                get_console().print(
-                    f"  [warning]Could not fetch diff for PR {_pr_link(pr)}. "
-                    f"Review manually at: {pr.url}/files[/]"
-                )
-
-            answer = user_confirm(
-                f"No suspicious changes found in PR {_pr_link(pr)}? "
-                f"Approve {len(pending_runs)} workflow {'runs' if 
len(pending_runs) != 1 else 'run'}?",
-                forced_answer=answer_triage,
-            )
-            if answer == Answer.QUIT:
-                get_console().print("[warning]Quitting.[/]")
-                quit_early = True
-                break
-            if answer == Answer.NO:
-                get_console().print(
-                    f"\n  [bold red]Suspicious changes detected in PR 
{_pr_link(pr)} by {pr.author_login}.[/]"
-                )
-                get_console().print(f"  Fetching all open PRs by 
{pr.author_login}...")
-                author_prs = _fetch_author_open_prs(token, github_repository, 
pr.author_login)
-                if not author_prs:
-                    get_console().print(f"  [dim]No open PRs found for 
{pr.author_login}.[/]")
-                    continue
-
-                get_console().print()
-                get_console().print(
-                    f"  [bold red]The following {len(author_prs)} "
-                    f"{'PRs' if len(author_prs) != 1 else 'PR'} by "
-                    f"{pr.author_login} will be closed, labeled "
-                    f"'{_SUSPICIOUS_CHANGES_LABEL}', and commented:[/]"
-                )
-                for pr_info in author_prs:
-                    get_console().print(
-                        f"    - 
[link={pr_info['url']}]#{pr_info['number']}[/link] {pr_info['title']}"
-                    )
-                get_console().print()
-
-                confirm = user_confirm(
-                    f"Close all {len(author_prs)} {'PRs' if len(author_prs) != 
1 else 'PR'} "
-                    f"by {pr.author_login} and label as suspicious?",
-                    forced_answer=answer_triage,
-                )
-                if confirm == Answer.QUIT:
-                    get_console().print("[warning]Quitting.[/]")
-                    quit_early = True
-                    break
-                if confirm == Answer.NO:
-                    get_console().print(f"  [info]Skipping — no PRs closed for 
{pr.author_login}.[/]")
-                    continue
-
-                closed, commented = _close_suspicious_prs(token, 
github_repository, author_prs, pr.number)
-                get_console().print(
-                    f"  [success]Closed {closed}/{len(author_prs)} "
-                    f"{'PRs' if len(author_prs) != 1 else 'PR'}, commented on 
{commented}.[/]"
-                )
-                total_closed += closed
-                continue
-
-            approved = _approve_workflow_runs(token, github_repository, 
pending_runs)
-            if approved:
-                get_console().print(
-                    f"  [success]Approved {approved}/{len(pending_runs)} 
workflow "
-                    f"{'runs' if len(pending_runs) != 1 else 'run'} for PR "
-                    f"{_pr_link(pr)}.[/]"
-                )
-                total_workflows_approved += 1
-            else:
-                get_console().print(f"  [error]Failed to approve workflow runs 
for PR {_pr_link(pr)}.[/]")
+    author_flagged_count: dict[str, int] = dict(
+        Counter(pr.author_login for pr in candidate_prs if pr.number in 
assessments)
+    )
+    stats = TriageStats()
+    ctx = TriageContext(
+        token=token,
+        github_repository=github_repository,
+        dry_run=dry_run,
+        answer_triage=answer_triage,
+        stats=stats,
+        author_flagged_count=author_flagged_count,
+        llm_future_to_pr=llm_future_to_pr,
+        llm_assessments=llm_assessments,
+        llm_completed=llm_completed,
+        llm_errors=llm_errors,
+        llm_passing=llm_passing,
+    )
 
-    # Summary
-    get_console().print()
-    summary_table = Table(title="Summary")
-    summary_table.add_column("Metric", style="bold")
-    summary_table.add_column("Count", justify="right")
-    total_skipped = total_skipped_collaborator + total_skipped_bot + 
total_skipped_accepted
-    summary_table.add_row("PRs fetched", str(len(all_prs)))
-    if verbose:
-        summary_table.add_row("Collaborators skipped", 
str(total_skipped_collaborator))
-        summary_table.add_row("Bots skipped", str(total_skipped_bot))
-        summary_table.add_row("Ready-for-review skipped", 
str(total_skipped_accepted))
-    summary_table.add_row("PRs skipped (filtered)", str(total_skipped))
-    summary_table.add_row("PRs assessed", str(len(candidate_prs)))
-    summary_table.add_row("Flagged by CI/conflicts/comments", 
str(total_deterministic_flags))
-    summary_table.add_row("Flagged by LLM", str(total_flagged - 
total_deterministic_flags))
-    summary_table.add_row("LLM errors (skipped)", str(total_llm_errors))
-    summary_table.add_row("Total flagged", str(total_flagged))
-    summary_table.add_row("PRs passing all checks", str(len(passing_prs)))
-    summary_table.add_row("PRs converted to draft", str(total_converted))
-    summary_table.add_row("PRs commented (not drafted)", str(total_commented))
-    summary_table.add_row("PRs closed", str(total_closed))
-    summary_table.add_row("PRs marked ready for review", str(total_ready))
-    summary_table.add_row("PRs skipped (no action)", str(total_skipped_action))
-    summary_table.add_row("Awaiting workflow approval", 
str(len(pending_approval)))
-    summary_table.add_row("PRs with workflows approved", 
str(total_workflows_approved))
-    get_console().print(summary_table)
+    # Phase 4b: Present NOT_RUN PRs for workflow approval (LLM runs in 
background)
+    _review_workflow_approval_prs(ctx, pending_approval)
+
+    # Phase 5a: Present deterministically flagged PRs
+    det_flagged_prs = [(pr, assessments[pr.number]) for pr in candidate_prs if 
pr.number in assessments]
+    det_flagged_prs.sort(key=lambda pair: (pair[0].author_login.lower(), 
pair[0].number))
+    _review_deterministic_flagged_prs(ctx, det_flagged_prs)
+
+    # Phase 5b: Present LLM-flagged PRs as they become ready (streaming)
+    _review_llm_flagged_prs(ctx, llm_candidates)
+
+    # Add LLM passing PRs to the passing list
+    passing_prs.extend(llm_passing)
+
+    # Phase 5c: Present passing PRs for optional ready-for-review marking
+    _review_passing_prs(ctx, passing_prs)
+
+    # Shut down LLM executor if it was started
+    if llm_executor is not None:
+        llm_executor.shutdown(wait=False)

Review Comment:
   nit: When the user quits early, in-flight LLM calls will continue running in 
daemon threads after `shutdown(wait=False)`. I think this might happen — WDYT?
   Would `cancel_futures=True` be useful here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to