This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 247551da1d0 Add CI duration trend monitor to warn on slow main builds
(#68368)
247551da1d0 is described below
commit 247551da1d069ff6f2c4d32a5f2dc739f44d71a4
Author: Jarek Potiuk <[email protected]>
AuthorDate: Sun Jun 14 01:39:44 2026 +0200
Add CI duration trend monitor to warn on slow main builds (#68368)
Adds an automated warning when CI run times on `main` creep above the
recent trend, so slowdowns are caught early instead of at the timeout
cliff (as recently happened with the MySQL tests).
A daily scheduled workflow runs a new analysis script that fetches the
recent `schedule`-event runs of `ci-amd.yml` on `main` (the post-merge
canaries), computes the wall-clock duration of each run and each job,
and compares the latest run against a robust median baseline of the
preceding runs. A regression is only flagged when the increase clears
both a relative threshold (default +25%) and an absolute floor, so
short, noisy jobs do not raise spurious alerts. When a regression is
detected it posts to the internal-airflow-ci-cd Slack channel; otherwise
it stays quiet.
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
.github/workflows/ci-duration-monitor.yml | 69 +++
scripts/ci/analyze_ci_job_durations.py | 586 ++++++++++++++++++++++
scripts/tests/ci/test_analyze_ci_job_durations.py | 310 ++++++++++++
3 files changed, 965 insertions(+)
diff --git a/.github/workflows/ci-duration-monitor.yml
b/.github/workflows/ci-duration-monitor.yml
new file mode 100644
index 00000000000..bc0f093203a
--- /dev/null
+++ b/.github/workflows/ci-duration-monitor.yml
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "CI Duration Monitor"
+on: # yamllint disable-line rule:truthy
+ schedule:
+ # A few hours after the daily scheduled CI canaries, so the latest main
run is in the window.
+ - cron: '0 9 * * *'
+ workflow_dispatch:
+permissions:
+ contents: read
+ actions: read
+env:
+ GITHUB_REPOSITORY: ${{ github.repository }}
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
+
+jobs:
+
+ monitor-ci-durations:
+ name: "Monitor CI durations on main"
+ runs-on: ubuntu-latest
+ steps:
+ - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+ uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd #
v6.0.2
+ with:
+ persist-credentials: false
+
+ - name: "Analyze CI job durations"
+ id: analyze
+ run: python3 scripts/ci/analyze_ci_job_durations.py
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ # main coverage comes from the scheduled canary runs of the AMD
workflow.
+ WORKFLOW_NAME: "ci-amd.yml"
+ BRANCH: "main"
+ MAX_RUNS: "25"
+ OUTPUT_FILE: "slack-message.json"
+
+ - name: "Post duration alert to Slack"
+ if: steps.analyze.outputs.has-regression == 'true'
+ uses:
slackapi/slack-github-action@45a88b9581bfab2566dc881e2cd66d334e621e2c # v3.0.3
+ with:
+ method: chat.postMessage
+ token: ${{ env.SLACK_BOT_TOKEN }}
+ payload-file-path: "slack-message.json"
+
+ - name: "Upload duration analysis"
+ uses: actions/upload-artifact@043fb46d1a93c77aae656e7c1c64a875d1fc6a0a
# v7.0.1
+ with:
+ name: "ci-duration-analysis"
+ path: slack-message.json
+ retention-days: 14
+ if: always() && steps.analyze.outputs.has-regression == 'true'
diff --git a/scripts/ci/analyze_ci_job_durations.py
b/scripts/ci/analyze_ci_job_durations.py
new file mode 100644
index 00000000000..6cc512f645b
--- /dev/null
+++ b/scripts/ci/analyze_ci_job_durations.py
@@ -0,0 +1,586 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# /// script
+# requires-python = ">=3.10"
+# ///
+"""
+Watch CI job durations on ``main`` and warn when they creep above the recent
trend.
+
+Motivation: CI run times can drift upwards slowly (new tests, slower runners,
+queue pressure) and nobody notices until a job starts timing out. This script
+fetches the recent completed runs of the main CI workflow, computes the
wall-clock
+duration of each run and of each individual job, and compares the latest run(s)
+against a robust baseline built from the preceding runs. When the latest
duration
+is meaningfully above that baseline it emits a warning so regressions are
caught
+early rather than at the timeout cliff.
+
+The baseline is the *median* of the preceding window (robust to the odd slow
run),
+and a regression is only flagged when the latest median exceeds the baseline by
+both a relative margin (``REL_THRESHOLD``) and an absolute floor
+(``MIN_ABS_INCREASE_MINUTES`` / ``JOB_MIN_ABS_INCREASE_MINUTES``) so short jobs
+with noisy timings do not trigger spurious alerts.
+
+Environment variables (required):
+ GITHUB_REPOSITORY - Owner/repo (e.g. apache/airflow)
+ GITHUB_TOKEN - GitHub token for API access (used by ``gh``)
+
+Environment variables (optional):
+ WORKFLOW_NAME - Workflow file to query (default: ci-amd.yml)
+ BRANCH - Branch to filter runs (default: main)
+ EVENT - Trigger event to restrict runs to, e.g.
"schedule"; the main
+ post-merge canaries. Empty string = all events
(default: schedule)
+ MAX_RUNS - Window of completed runs to analyze (default: 25)
+ LATEST_RUNS - Most-recent runs compared against the baseline
(default: 1)
+ MIN_BASELINE_RUNS - Minimum baseline runs needed to compute a trend
(default: 5)
+ REL_THRESHOLD - Relative increase over baseline to flag, e.g.
0.25 = 25% (default: 0.25)
+ MIN_ABS_INCREASE_MINUTES - Absolute floor for the overall-run alert
(default: 5)
+ JOB_MIN_ABS_INCREASE_MINUTES - Absolute floor for per-job alerts (default: 3)
+ ANALYZE_JOBS - Whether to fetch per-job durations
("true"/"false", default: true)
+ ONLY_SUCCESSFUL - Only consider runs that concluded "success"
(default: true)
+ SLACK_CHANNEL - Slack channel for the message payload (default:
internal-airflow-ci-cd)
+ OUTPUT_FILE - Path for the Slack message output (default:
slack-message.json)
+ GITHUB_OUTPUT - Path to GitHub Actions output file
+ GITHUB_STEP_SUMMARY - Path to GitHub Actions step summary file
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import subprocess
+import sys
+from datetime import datetime
+from pathlib import Path
+
+ISO_SUFFIX_Z = "Z"
+
+
+def env_float(name: str, default: float) -> float:
+ """Read a float environment variable, falling back to ``default`` when
unset/invalid."""
+ raw = os.environ.get(name)
+ if raw is None or raw.strip() == "":
+ return default
+ try:
+ return float(raw)
+ except ValueError:
+ print(f"Invalid float for {name}={raw!r}; using default {default}",
file=sys.stderr)
+ return default
+
+
+def env_int(name: str, default: int) -> int:
+ """Read an int environment variable, falling back to ``default`` when
unset/invalid."""
+ raw = os.environ.get(name)
+ if raw is None or raw.strip() == "":
+ return default
+ try:
+ return int(raw)
+ except ValueError:
+ print(f"Invalid int for {name}={raw!r}; using default {default}",
file=sys.stderr)
+ return default
+
+
+def env_bool(name: str, default: bool) -> bool:
+ """Read a boolean environment variable ("true"/"false")."""
+ raw = os.environ.get(name)
+ if raw is None or raw.strip() == "":
+ return default
+ return raw.strip().lower() in {"1", "true", "yes", "on"}
+
+
+def escape_slack_mrkdwn(text: str) -> str:
+ """Escape special characters for Slack mrkdwn format."""
+ text = text.replace("&", "&")
+ text = text.replace("<", "<")
+ text = text.replace(">", ">")
+ return text
+
+
+def gh_api(endpoint: str, **kwargs: str) -> str | None:
+ """Call GitHub API via gh CLI.
+
+ Forces ``--method GET``: ``gh api`` defaults to POST whenever ``-f``
+ parameters are present, which makes read-only endpoints (such as the
+ workflow runs list) return 404.
+ """
+ cmd = ["gh", "api", "--method", "GET", endpoint]
+ for key, value in kwargs.items():
+ cmd.extend(["-f", f"{key}={value}"])
+ result = subprocess.run(cmd, capture_output=True, text=True, check=False)
+ if result.returncode != 0:
+ print(f"gh api error for {endpoint}: {result.stderr}", file=sys.stderr)
+ return None
+ return result.stdout.strip()
+
+
+def parse_iso(timestamp: str | None) -> datetime | None:
+ """Parse an ISO-8601 timestamp (with trailing ``Z``) to an aware
datetime."""
+ if not timestamp:
+ return None
+ try:
+ # ``datetime.fromisoformat`` only learned to parse the ``Z`` suffix in
3.11.
+ if timestamp.endswith(ISO_SUFFIX_Z):
+ timestamp = timestamp[:-1] + "+00:00"
+ return datetime.fromisoformat(timestamp)
+ except ValueError:
+ return None
+
+
+def duration_seconds(start: str | None, end: str | None) -> float | None:
+ """Return the number of seconds between two ISO timestamps, or None if
unparsable."""
+ start_dt = parse_iso(start)
+ end_dt = parse_iso(end)
+ if start_dt is None or end_dt is None:
+ return None
+ seconds = (end_dt - start_dt).total_seconds()
+ if seconds < 0:
+ return None
+ return seconds
+
+
+def median(values: list[float]) -> float:
+ """Return the median of a non-empty list of values."""
+ ordered = sorted(values)
+ n = len(ordered)
+ mid = n // 2
+ if n % 2 == 1:
+ return ordered[mid]
+ return (ordered[mid - 1] + ordered[mid]) / 2
+
+
+def format_duration(seconds: float) -> str:
+ """Format a duration in seconds as e.g. ``29m 41s``."""
+ total = int(round(seconds))
+ minutes, secs = divmod(total, 60)
+ if minutes == 0:
+ return f"{secs}s"
+ return f"{minutes}m {secs:02d}s"
+
+
+# Wall-clock shorter than this almost always means a run that was cancelled,
+# skipped by selective checks, or never really executed the test matrix — not a
+# representative "main build". Such runs would corrupt the duration baseline.
+MIN_VALID_RUN_SECONDS = 120
+
+
+def get_recent_runs(
+ repo: str, workflow: str, branch: str, max_runs: int, only_successful:
bool, event: str
+) -> list[dict]:
+ """Get recent completed workflow runs (newest first) with timing metadata.
+
+ ``event`` (e.g. ``schedule``) restricts the result to a single trigger
type so
+ that PR runs targeting ``main`` are not mixed in with the post-merge canary
+ runs. Pass an empty string to include every event.
+ """
+ # Over-fetch when filtering to success so we still end up with ~max_runs
usable runs.
+ per_page = max_runs * 2 if only_successful else max_runs
+ per_page = min(per_page, 100)
+ params = {
+ "branch": branch,
+ "per_page": str(per_page),
+ "status": "completed",
+ }
+ if event:
+ params["event"] = event
+ output = gh_api(f"repos/{repo}/actions/workflows/{workflow}/runs",
**params)
+ if not output:
+ return []
+ try:
+ data = json.loads(output)
+ except json.JSONDecodeError:
+ return []
+
+ runs: list[dict] = []
+ for run in data.get("workflow_runs", []):
+ if only_successful and run.get("conclusion") != "success":
+ continue
+ seconds = duration_seconds(run.get("run_started_at"),
run.get("updated_at"))
+ if seconds is None or seconds < MIN_VALID_RUN_SECONDS:
+ continue
+ runs.append(
+ {
+ "id": run["id"],
+ "run_number": run.get("run_number", "?"),
+ "created_at": run.get("created_at", ""),
+ "conclusion": run.get("conclusion", "unknown"),
+ "event": run.get("event", "unknown"),
+ "html_url": run.get("html_url", ""),
+ "duration": seconds,
+ }
+ )
+ if len(runs) >= max_runs:
+ break
+ return runs
+
+
+def get_run_jobs(repo: str, run_id: int) -> dict[str, float]:
+ """Return a mapping of job name -> duration in seconds for a single run.
+
+ Only jobs that completed successfully are included, so that a job which was
+ cancelled or skipped on a particular run does not pollute its duration
trend.
+ """
+ result = subprocess.run(
+ ["gh", "run", "view", str(run_id), "--repo", repo, "--json", "jobs"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ if result.returncode != 0:
+ print(f"Could not fetch jobs for run {run_id}: {result.stderr}",
file=sys.stderr)
+ return {}
+ try:
+ data = json.loads(result.stdout)
+ except json.JSONDecodeError:
+ return {}
+
+ durations: dict[str, float] = {}
+ for job in data.get("jobs", []):
+ if job.get("conclusion") != "success":
+ continue
+ seconds = duration_seconds(job.get("startedAt"),
job.get("completedAt"))
+ if seconds is None:
+ continue
+ name = job.get("name", "unknown")
+ # A matrix can surface the same job name more than once per run; keep
the longest.
+ durations[name] = max(durations.get(name, 0.0), seconds)
+ return durations
+
+
+def detect_regression(
+ latest_values: list[float],
+ baseline_values: list[float],
+ rel_threshold: float,
+ min_abs_increase_seconds: float,
+) -> dict | None:
+ """Compare latest durations against a baseline window.
+
+ Returns a dict describing the regression when the latest median is above
the
+ baseline median by both the relative threshold and the absolute floor,
else None.
+ """
+ if not latest_values or not baseline_values:
+ return None
+ latest = median(latest_values)
+ baseline = median(baseline_values)
+ increase = latest - baseline
+ rel_increase = increase / baseline if baseline > 0 else 0.0
+ if increase >= min_abs_increase_seconds and rel_increase >= rel_threshold:
+ return {
+ "latest": latest,
+ "baseline": baseline,
+ "increase": increase,
+ "rel_increase": rel_increase,
+ }
+ return None
+
+
+def analyze_jobs(
+ repo: str,
+ latest_runs: list[dict],
+ baseline_runs: list[dict],
+ min_baseline_runs: int,
+ rel_threshold: float,
+ min_abs_increase_seconds: float,
+) -> list[dict]:
+ """Fetch per-job durations and return the jobs whose latest duration
regressed."""
+ latest_job_durations: dict[str, list[float]] = {}
+ for run in latest_runs:
+ for name, seconds in get_run_jobs(repo, run["id"]).items():
+ latest_job_durations.setdefault(name, []).append(seconds)
+
+ baseline_job_durations: dict[str, list[float]] = {}
+ for run in baseline_runs:
+ for name, seconds in get_run_jobs(repo, run["id"]).items():
+ baseline_job_durations.setdefault(name, []).append(seconds)
+
+ regressions: list[dict] = []
+ for name, latest_values in latest_job_durations.items():
+ baseline_values = baseline_job_durations.get(name, [])
+ if len(baseline_values) < min_baseline_runs:
+ continue
+ regression = detect_regression(
+ latest_values, baseline_values, rel_threshold,
min_abs_increase_seconds
+ )
+ if regression:
+ regression["job"] = name
+ regressions.append(regression)
+
+ regressions.sort(key=lambda r: r["rel_increase"], reverse=True)
+ return regressions
+
+
+def format_slack_message(
+ repo: str,
+ workflow: str,
+ branch: str,
+ overall_regression: dict | None,
+ job_regressions: list[dict],
+ recent_runs: list[dict],
+ rel_threshold: float,
+ channel: str,
+) -> dict:
+ """Format the regression report as a Slack Block Kit message."""
+ blocks: list[dict] = [
+ {
+ "type": "header",
+ "text": {"type": "plain_text", "text": "⏱️ CI Duration Trend
Alert"},
+ },
+ {
+ "type": "section",
+ "text": {
+ "type": "mrkdwn",
+ "text": (
+ f"CI run times on *{escape_slack_mrkdwn(branch)}* "
+ f"(`{escape_slack_mrkdwn(workflow)}`) have risen above the
recent trend "
+ f"(baseline = median of the preceding runs; threshold = "
+ f"+{int(rel_threshold * 100)}%)."
+ ),
+ },
+ },
+ {"type": "divider"},
+ ]
+
+ if overall_regression:
+ blocks.append(
+ {
+ "type": "section",
+ "text": {
+ "type": "mrkdwn",
+ "text": (
+ "*Overall run wall-clock regressed:*\n"
+ f"• Latest:
*{format_duration(overall_regression['latest'])}*\n"
+ f"• Baseline:
{format_duration(overall_regression['baseline'])}\n"
+ f"• Increase:
*+{format_duration(overall_regression['increase'])}* "
+ f"(+{round(overall_regression['rel_increase'] * 100,
1)}%)"
+ ),
+ },
+ }
+ )
+
+ if job_regressions:
+ lines = ["*Jobs that got slower:*"]
+ for reg in job_regressions[:15]:
+ lines.append(
+ f"• *{escape_slack_mrkdwn(reg['job'])}* — "
+ f"{format_duration(reg['baseline'])} →
*{format_duration(reg['latest'])}* "
+ f"(+{round(reg['rel_increase'] * 100, 1)}%)"
+ )
+ text = "\n".join(lines)
+ if len(text) > 2900:
+ text = text[:2900] + "\n_...truncated_"
+ blocks.append({"type": "section", "text": {"type": "mrkdwn", "text":
text}})
+ if len(job_regressions) > 15:
+ blocks.append(
+ {
+ "type": "context",
+ "elements": [
+ {
+ "type": "mrkdwn",
+ "text": f"_...and {len(job_regressions) - 15} more
slower jobs_",
+ }
+ ],
+ }
+ )
+
+ if recent_runs:
+ latest_run = recent_runs[0]
+ blocks.append({"type": "divider"})
+ blocks.append(
+ {
+ "type": "context",
+ "elements": [
+ {
+ "type": "mrkdwn",
+ "text": (
+
f"<https://github.com/{repo}/actions/workflows/{workflow}"
+ f"?query=branch%3A{branch}|View
{escape_slack_mrkdwn(workflow)} runs> | "
+ f"Latest: <{latest_run['html_url']}|Run
#{latest_run['run_number']}> "
+ f"({format_duration(latest_run['duration'])})"
+ ),
+ }
+ ],
+ }
+ )
+
+ fallback_parts = []
+ if overall_regression:
+ fallback_parts.append(f"overall
+{round(overall_regression['rel_increase'] * 100, 1)}%")
+ if job_regressions:
+ fallback_parts.append(f"{len(job_regressions)} slower job(s)")
+ fallback = f"CI Duration Trend Alert on {branch}: " + ",
".join(fallback_parts)
+
+ return {
+ "channel": channel,
+ "text": fallback,
+ "blocks": blocks,
+ }
+
+
+def write_step_summary(
+ workflow: str,
+ branch: str,
+ overall_regression: dict | None,
+ job_regressions: list[dict],
+ recent_runs: list[dict],
+ baseline_count: int,
+) -> None:
+ """Write a GitHub Actions step summary in markdown."""
+ summary_path = os.environ.get("GITHUB_STEP_SUMMARY")
+ if not summary_path:
+ return
+
+ lines = [
+ "## ⏱️ CI Duration Trend",
+ "",
+ f"Workflow `{workflow}` on `{branch}` — baseline from {baseline_count}
preceding runs.",
+ "",
+ ]
+
+ if overall_regression:
+ lines += [
+ "### ⚠️ Overall run regressed",
+ "",
+ f"- Latest: **{format_duration(overall_regression['latest'])}**",
+ f"- Baseline: {format_duration(overall_regression['baseline'])}",
+ f"- Increase:
**+{format_duration(overall_regression['increase'])}** "
+ f"(+{round(overall_regression['rel_increase'] * 100, 1)}%)",
+ "",
+ ]
+ else:
+ lines += ["### ✅ Overall run within trend", ""]
+
+ if job_regressions:
+ lines += [
+ "### ⚠️ Slower jobs",
+ "",
+ "| Job | Baseline | Latest | Increase |",
+ "|-----|----------|--------|----------|",
+ ]
+ for reg in job_regressions[:25]:
+ lines.append(
+ f"| {reg['job']} | {format_duration(reg['baseline'])} | "
+ f"{format_duration(reg['latest'])} |
+{round(reg['rel_increase'] * 100, 1)}% |"
+ )
+ lines.append("")
+ else:
+ lines += ["### ✅ No individual job regressed", ""]
+
+ if recent_runs:
+ lines += [
+ "### Recent run durations",
+ "",
+ "| Run | Event | Duration |",
+ "|-----|-------|----------|",
+ ]
+ for run in recent_runs[:15]:
+ lines.append(
+ f"| [#{run['run_number']}]({run['html_url']}) | {run['event']}
| "
+ f"{format_duration(run['duration'])} |"
+ )
+ lines.append("")
+
+ with open(summary_path, "a") as f:
+ f.write("\n".join(lines))
+
+
+def main() -> None:
+ repo = os.environ.get("GITHUB_REPOSITORY", "apache/airflow")
+ workflow = os.environ.get("WORKFLOW_NAME", "ci-amd.yml")
+ branch = os.environ.get("BRANCH", "main")
+ event = os.environ.get("EVENT", "schedule")
+ max_runs = env_int("MAX_RUNS", 25)
+ latest_runs_count = env_int("LATEST_RUNS", 1)
+ min_baseline_runs = env_int("MIN_BASELINE_RUNS", 5)
+ rel_threshold = env_float("REL_THRESHOLD", 0.25)
+ min_abs_increase_seconds = env_float("MIN_ABS_INCREASE_MINUTES", 5.0) * 60
+ job_min_abs_increase_seconds = env_float("JOB_MIN_ABS_INCREASE_MINUTES",
3.0) * 60
+ do_analyze_jobs = env_bool("ANALYZE_JOBS", True)
+ only_successful = env_bool("ONLY_SUCCESSFUL", True)
+ channel = os.environ.get("SLACK_CHANNEL", "internal-airflow-ci-cd")
+ output_file = Path(os.environ.get("OUTPUT_FILE", "slack-message.json"))
+
+ event_label = event or "all events"
+ print(f"Analyzing CI durations for {repo} ({workflow} on {branch},
event={event_label})")
+ print(f"Window: up to {max_runs} completed runs; latest
{latest_runs_count} vs baseline.")
+
+ runs = get_recent_runs(repo, workflow, branch, max_runs, only_successful,
event)
+ if len(runs) < latest_runs_count + min_baseline_runs:
+ print(
+ f"Not enough runs to establish a trend "
+ f"(found {len(runs)}, need {latest_runs_count +
min_baseline_runs}). Skipping."
+ )
+ _write_outputs(False, False, 0)
+ sys.exit(0)
+
+ latest_runs = runs[:latest_runs_count]
+ baseline_runs = runs[latest_runs_count:]
+ print(f"Latest runs: {len(latest_runs)}; baseline runs:
{len(baseline_runs)}.")
+
+ overall_regression = detect_regression(
+ [r["duration"] for r in latest_runs],
+ [r["duration"] for r in baseline_runs],
+ rel_threshold,
+ min_abs_increase_seconds,
+ )
+ if overall_regression:
+ print(
+ f"Overall regression:
{format_duration(overall_regression['baseline'])} -> "
+ f"{format_duration(overall_regression['latest'])} "
+ f"(+{round(overall_regression['rel_increase'] * 100, 1)}%)"
+ )
+ else:
+ print("Overall run duration is within the recent trend.")
+
+ job_regressions: list[dict] = []
+ if do_analyze_jobs:
+ job_regressions = analyze_jobs(
+ repo,
+ latest_runs,
+ baseline_runs,
+ min_baseline_runs,
+ rel_threshold,
+ job_min_abs_increase_seconds,
+ )
+ print(f"Jobs that regressed: {len(job_regressions)}")
+
+ has_regression = bool(overall_regression) or bool(job_regressions)
+
+ if has_regression:
+ slack_message = format_slack_message(
+ repo, workflow, branch, overall_regression, job_regressions, runs,
rel_threshold, channel
+ )
+ output_file.write_text(json.dumps(slack_message, indent=2))
+ print(f"Slack message written to: {output_file}")
+ else:
+ print("No regression detected; no Slack message written.")
+
+ write_step_summary(workflow, branch, overall_regression, job_regressions,
runs, len(baseline_runs))
+ _write_outputs(has_regression, bool(overall_regression),
len(job_regressions))
+
+
+def _write_outputs(has_regression: bool, overall_regression: bool,
regressed_jobs: int) -> None:
+ """Write GitHub Actions outputs used to gate the Slack-notify step."""
+ github_output = os.environ.get("GITHUB_OUTPUT")
+ if not github_output:
+ return
+ with open(github_output, "a") as f:
+ f.write(f"has-regression={str(has_regression).lower()}\n")
+ f.write(f"overall-regression={str(overall_regression).lower()}\n")
+ f.write(f"regressed-jobs={regressed_jobs}\n")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/tests/ci/test_analyze_ci_job_durations.py
b/scripts/tests/ci/test_analyze_ci_job_durations.py
new file mode 100644
index 00000000000..432fefe44a5
--- /dev/null
+++ b/scripts/tests/ci/test_analyze_ci_job_durations.py
@@ -0,0 +1,310 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib.util
+import json
+import subprocess
+import sys
+from pathlib import Path
+from unittest.mock import patch
+
+import pytest
+
+MODULE_PATH = Path(__file__).resolve().parents[3] / "scripts" / "ci" /
"analyze_ci_job_durations.py"
+
+
[email protected]
+def durations_module():
+ module_name = "test_analyze_ci_job_durations_module"
+ sys.modules.pop(module_name, None)
+ spec = importlib.util.spec_from_file_location(module_name, MODULE_PATH)
+ assert spec is not None
+ assert spec.loader is not None
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+
+
+class TestGhApi:
+ def test_forces_get_method(self, durations_module):
+ """`gh api` defaults to POST when -f is passed; we must force GET to
avoid 404."""
+ completed = subprocess.CompletedProcess(args=[], returncode=0,
stdout="{}", stderr="")
+ with patch.object(subprocess, "run", return_value=completed) as
mock_run:
+
durations_module.gh_api("repos/apache/airflow/actions/workflows/x/runs",
branch="main")
+ args = mock_run.call_args[0][0]
+ assert "--method" in args
+ assert args[args.index("--method") + 1] == "GET"
+
+
+class TestParseIso:
+ def test_parses_z_suffix(self, durations_module):
+ dt = durations_module.parse_iso("2026-06-10T13:01:53Z")
+ assert dt is not None
+ assert dt.year == 2026 and dt.hour == 13
+
+ def test_parses_offset(self, durations_module):
+ dt = durations_module.parse_iso("2026-06-10T13:01:53+00:00")
+ assert dt is not None
+
+ def test_none_for_empty(self, durations_module):
+ assert durations_module.parse_iso("") is None
+ assert durations_module.parse_iso(None) is None
+
+ def test_none_for_garbage(self, durations_module):
+ assert durations_module.parse_iso("not-a-date") is None
+
+
+class TestDurationSeconds:
+ def test_computes_positive_duration(self, durations_module):
+ seconds = durations_module.duration_seconds("2026-06-10T13:00:00Z",
"2026-06-10T13:29:00Z")
+ assert seconds == 29 * 60
+
+ def test_none_when_unparsable(self, durations_module):
+ assert durations_module.duration_seconds("bad",
"2026-06-10T13:29:00Z") is None
+
+ def test_none_when_negative(self, durations_module):
+ """A clock skew / out-of-order pair must not produce a negative
duration."""
+ assert durations_module.duration_seconds("2026-06-10T13:29:00Z",
"2026-06-10T13:00:00Z") is None
+
+
+class TestMedian:
+ def test_odd(self, durations_module):
+ assert durations_module.median([3, 1, 2]) == 2
+
+ def test_even(self, durations_module):
+ assert durations_module.median([1, 2, 3, 4]) == 2.5
+
+
+class TestFormatDuration:
+ def test_minutes_and_seconds(self, durations_module):
+ assert durations_module.format_duration(29 * 60 + 41) == "29m 41s"
+
+ def test_seconds_only(self, durations_module):
+ assert durations_module.format_duration(45) == "45s"
+
+ def test_zero_pads_seconds(self, durations_module):
+ assert durations_module.format_duration(60 + 5) == "1m 05s"
+
+
+class TestDetectRegression:
+ def test_flags_regression_above_both_thresholds(self, durations_module):
+ # baseline median ~1800s (30m), latest 2700s (45m) -> +50%, +15m
+ regression = durations_module.detect_regression(
+ latest_values=[2700],
+ baseline_values=[1800, 1810, 1790, 1805, 1795],
+ rel_threshold=0.25,
+ min_abs_increase_seconds=300,
+ )
+ assert regression is not None
+ assert regression["latest"] == 2700
+ assert round(regression["rel_increase"], 2) == 0.5
+
+ def test_no_regression_below_relative_threshold(self, durations_module):
+ # +5% only — under the 25% relative threshold even though absolute is
large
+ regression = durations_module.detect_regression(
+ latest_values=[6300],
+ baseline_values=[6000, 6000, 6000, 6000, 6000],
+ rel_threshold=0.25,
+ min_abs_increase_seconds=300,
+ )
+ assert regression is None
+
+ def test_no_regression_below_absolute_floor(self, durations_module):
+ # +50% relative but only +60s absolute — under the 300s floor (noisy
short job)
+ regression = durations_module.detect_regression(
+ latest_values=[180],
+ baseline_values=[120, 120, 120, 120, 120],
+ rel_threshold=0.25,
+ min_abs_increase_seconds=300,
+ )
+ assert regression is None
+
+ def test_robust_to_single_baseline_outlier(self, durations_module):
+ # One slow baseline run should not move the median enough to mask a
real regression.
+ regression = durations_module.detect_regression(
+ latest_values=[2700],
+ baseline_values=[1800, 1800, 1800, 1800, 5000],
+ rel_threshold=0.25,
+ min_abs_increase_seconds=300,
+ )
+ assert regression is not None
+
+ def test_empty_inputs(self, durations_module):
+ assert durations_module.detect_regression([], [1, 2], 0.25, 300) is
None
+ assert durations_module.detect_regression([1], [], 0.25, 300) is None
+
+
+class TestGetRecentRuns:
+ def _runs_payload(self):
+ return json.dumps(
+ {
+ "workflow_runs": [
+ {
+ "id": 2,
+ "run_number": 102,
+ "conclusion": "success",
+ "event": "schedule",
+ "html_url": "https://example/2",
+ "run_started_at": "2026-06-10T13:00:00Z",
+ "updated_at": "2026-06-10T13:45:00Z",
+ },
+ {
+ "id": 1,
+ "run_number": 101,
+ "conclusion": "failure",
+ "event": "schedule",
+ "html_url": "https://example/1",
+ "run_started_at": "2026-06-09T13:00:00Z",
+ "updated_at": "2026-06-09T13:30:00Z",
+ },
+ {
+ # A cancelled/skipped run with a near-zero wall-clock
must be dropped
+ # so it cannot drag the baseline down.
+ "id": 3,
+ "run_number": 103,
+ "conclusion": "success",
+ "event": "schedule",
+ "html_url": "https://example/3",
+ "run_started_at": "2026-06-10T14:00:00Z",
+ "updated_at": "2026-06-10T14:00:30Z",
+ },
+ ]
+ }
+ )
+
+ def test_filters_to_successful_and_computes_duration(self,
durations_module):
+ with patch.object(durations_module, "gh_api",
return_value=self._runs_payload()):
+ runs = durations_module.get_recent_runs(
+ "apache/airflow", "ci-amd.yml", "main", max_runs=25,
only_successful=True, event="schedule"
+ )
+ # Only the 45-minute successful run survives: the failure and the 30s
run are dropped.
+ assert len(runs) == 1
+ assert runs[0]["id"] == 2
+ assert runs[0]["duration"] == 45 * 60
+
+ def test_includes_failures_when_not_filtering(self, durations_module):
+ with patch.object(durations_module, "gh_api",
return_value=self._runs_payload()):
+ runs = durations_module.get_recent_runs(
+ "apache/airflow", "ci-amd.yml", "main", max_runs=25,
only_successful=False, event="schedule"
+ )
+ # The failure is kept, but the 30s run is still dropped as
non-representative.
+ assert len(runs) == 2
+ assert {r["id"] for r in runs} == {1, 2}
+
+ def test_passes_event_to_api(self, durations_module):
+ with patch.object(durations_module, "gh_api",
return_value=self._runs_payload()) as mock_api:
+ durations_module.get_recent_runs(
+ "apache/airflow", "ci-amd.yml", "main", max_runs=25,
only_successful=True, event="schedule"
+ )
+ assert mock_api.call_args.kwargs.get("event") == "schedule"
+
+ def test_omits_event_when_empty(self, durations_module):
+ with patch.object(durations_module, "gh_api",
return_value=self._runs_payload()) as mock_api:
+ durations_module.get_recent_runs(
+ "apache/airflow", "ci-amd.yml", "main", max_runs=25,
only_successful=True, event=""
+ )
+ assert "event" not in mock_api.call_args.kwargs
+
+ def test_empty_on_api_failure(self, durations_module):
+ with patch.object(durations_module, "gh_api", return_value=None):
+ runs = durations_module.get_recent_runs(
+ "apache/airflow", "ci-amd.yml", "main", max_runs=25,
only_successful=True, event="schedule"
+ )
+ assert runs == []
+
+
+class TestGetRunJobs:
+ def test_parses_successful_jobs(self, durations_module):
+ payload = json.dumps(
+ {
+ "jobs": [
+ {
+ "name": "Tests",
+ "conclusion": "success",
+ "startedAt": "2026-06-10T13:00:00Z",
+ "completedAt": "2026-06-10T13:20:00Z",
+ },
+ {
+ "name": "Skipped job",
+ "conclusion": "skipped",
+ "startedAt": "2026-06-10T13:00:00Z",
+ "completedAt": "2026-06-10T13:00:00Z",
+ },
+ ]
+ }
+ )
+ completed = subprocess.CompletedProcess(args=[], returncode=0,
stdout=payload, stderr="")
+ with patch.object(subprocess, "run", return_value=completed):
+ jobs = durations_module.get_run_jobs("apache/airflow", 2)
+ assert jobs == {"Tests": 20 * 60}
+
+ def test_empty_on_command_failure(self, durations_module):
+ completed = subprocess.CompletedProcess(args=[], returncode=1,
stdout="", stderr="boom")
+ with patch.object(subprocess, "run", return_value=completed):
+ assert durations_module.get_run_jobs("apache/airflow", 2) == {}
+
+
+class TestAnalyzeJobs:
+ def test_reports_only_regressed_jobs_with_enough_baseline(self,
durations_module):
+ latest_runs = [{"id": 100}]
+ baseline_runs = [{"id": i} for i in range(5)]
+
+ def fake_jobs(_repo, run_id):
+ if run_id == 100:
+ return {"slow-job": 2700, "stable-job": 600, "new-job": 999}
+ # baseline runs
+ return {"slow-job": 1800, "stable-job": 590}
+
+ with patch.object(durations_module, "get_run_jobs",
side_effect=fake_jobs):
+ regressions = durations_module.analyze_jobs(
+ "apache/airflow",
+ latest_runs,
+ baseline_runs,
+ min_baseline_runs=5,
+ rel_threshold=0.25,
+ min_abs_increase_seconds=180,
+ )
+ names = [r["job"] for r in regressions]
+ # slow-job regressed; stable-job did not; new-job lacks baseline
samples
+ assert names == ["slow-job"]
+
+
+class TestFormatSlackMessage:
+ def test_includes_channel_and_blocks(self, durations_module):
+ msg = durations_module.format_slack_message(
+ repo="apache/airflow",
+ workflow="ci-amd.yml",
+ branch="main",
+ overall_regression={
+ "latest": 2700,
+ "baseline": 1800,
+ "increase": 900,
+ "rel_increase": 0.5,
+ },
+ job_regressions=[
+ {"job": "Tests", "latest": 1500, "baseline": 1000, "increase":
500, "rel_increase": 0.5}
+ ],
+ recent_runs=[{"run_number": 102, "html_url": "https://example/2",
"duration": 2700}],
+ rel_threshold=0.25,
+ channel="internal-airflow-ci-cd",
+ )
+ assert msg["channel"] == "internal-airflow-ci-cd"
+ assert any(b["type"] == "header" for b in msg["blocks"])
+ text_blob = json.dumps(msg)
+ assert "Tests" in text_blob
+ assert "main" in msg["text"]