This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 04d974e46fa [v3-2-test] Add daily E2E flaky test report workflow with 
Slack notifications (#64446) (#64496)
04d974e46fa is described below

commit 04d974e46fac832990c0c79e1f8c36d9f7a3d5c2
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Mar 30 21:57:14 2026 +0200

    [v3-2-test] Add daily E2E flaky test report workflow with Slack 
notifications (#64446) (#64496)
    
    Add infrastructure to track and report on flaky UI E2E tests:
    
    - Add extract_e2e_test_results.py to parse Playwright JSON results
      and produce structured failure/fixme test data as artifacts
    - Modify ui-e2e-tests.yml to run extraction and upload artifacts
      after each E2E test run
    - Add analyze_e2e_flaky_tests.py to aggregate failures across
      multiple CI runs and identify flaky test candidates
    - Add e2e-flaky-tests-report.yml scheduled workflow (daily at
      midnight UTC) that analyzes last 10 runs and posts a formatted
      report to #airflow-3-ui Slack channel
    - Add unit tests for both scripts (25 tests)
    (cherry picked from commit a1b9485731f54afd255a70352295da7b855a7abb)
    
    Co-authored-by: Jarek Potiuk <[email protected]>
---
 .github/workflows/e2e-flaky-tests-report.yml      |  67 +++
 .github/workflows/ui-e2e-tests.yml                |  17 +
 scripts/ci/analyze_e2e_flaky_tests.py             | 574 ++++++++++++++++++++++
 scripts/ci/extract_e2e_test_results.py            | 203 ++++++++
 scripts/tests/ci/test_analyze_e2e_flaky_tests.py  | 176 +++++++
 scripts/tests/ci/test_extract_e2e_test_results.py | 203 ++++++++
 6 files changed, 1240 insertions(+)

diff --git a/.github/workflows/e2e-flaky-tests-report.yml 
b/.github/workflows/e2e-flaky-tests-report.yml
new file mode 100644
index 00000000000..be10b33b1c6
--- /dev/null
+++ b/.github/workflows/e2e-flaky-tests-report.yml
@@ -0,0 +1,67 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+---
+name: "E2E Flaky Tests Report"
+on:  # yamllint disable-line rule:truthy
+  schedule:
+    - cron: '0 0 * * *'
+  workflow_dispatch:
+permissions:
+  contents: read
+  actions: read
+env:
+  GITHUB_REPOSITORY: ${{ github.repository }}
+  GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+  SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }}
+
+jobs:
+
+  analyze-flaky-tests:
+    name: "Analyze E2E flaky tests"
+    runs-on: ubuntu-latest
+    steps:
+      - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )"
+        uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd  # 
v6.0.2
+        with:
+          persist-credentials: false
+
+      - name: "Analyze E2E test results"
+        id: analyze
+        run: python3 scripts/ci/analyze_e2e_flaky_tests.py
+        env:
+          GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+          MAX_RUNS: "10"
+          WORKFLOW_NAME: "ci-amd-arm.yml"
+          BRANCH: "main"
+          OUTPUT_FILE: "slack-message.json"
+
+      - name: "Post report to Slack"
+        if: always() && steps.analyze.outcome == 'success'
+        uses: 
slackapi/slack-github-action@af78098f536edbc4de71162a307590698245be95  # v3.0.1
+        with:
+          method: chat.postMessage
+          token: ${{ env.SLACK_BOT_TOKEN }}
+          payload-file-path: "slack-message.json"
+
+      - name: "Upload analysis results"
+        uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f 
 # v7.0.0
+        with:
+          name: "e2e-flaky-test-analysis"
+          path: slack-message.json
+          retention-days: 14
+        if: always()
diff --git a/.github/workflows/ui-e2e-tests.yml 
b/.github/workflows/ui-e2e-tests.yml
index fe4e3603552..e2028f7c75a 100644
--- a/.github/workflows/ui-e2e-tests.yml
+++ b/.github/workflows/ui-e2e-tests.yml
@@ -157,3 +157,20 @@ jobs:
           retention-days: 7
           if-no-files-found: 'warn'
         if: always()
+      - name: "Extract E2E test failures and fixme tests"
+        run: python3 scripts/ci/extract_e2e_test_results.py
+        env:
+          RESULTS_JSON: "airflow-core/src/airflow/ui/test-results/results.json"
+          OUTPUT_DIR: "e2e-test-report"
+          BROWSER: "${{ env.BROWSER }}"
+          RUN_ID: "${{ github.run_id }}"
+          RUN_ATTEMPT: "${{ github.run_attempt }}"
+        if: always()
+      - name: "Upload E2E test report"
+        uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f 
 # v7.0.0
+        with:
+          name: "e2e-test-report-${{ env.BROWSER }}"
+          path: e2e-test-report/
+          retention-days: 14
+          if-no-files-found: 'warn'
+        if: always()
diff --git a/scripts/ci/analyze_e2e_flaky_tests.py 
b/scripts/ci/analyze_e2e_flaky_tests.py
new file mode 100755
index 00000000000..bf5aef53ffa
--- /dev/null
+++ b/scripts/ci/analyze_e2e_flaky_tests.py
@@ -0,0 +1,574 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# /// script
+# requires-python = ">=3.10"
+# ///
+"""
+Analyze E2E test results across multiple workflow runs to identify flaky tests.
+
+Downloads e2e-test-report artifacts from recent workflow runs, aggregates
+failure data, and produces a Slack-formatted report.
+
+The script queries GitHub Actions for recent runs of the UI E2E test workflows,
+downloads the test report artifacts, and analyzes failure patterns to identify
+tests that are good candidates for marking with test.fixme.
+
+Environment variables (required):
+  GITHUB_REPOSITORY  - Owner/repo (e.g. apache/airflow)
+  GITHUB_TOKEN       - GitHub token for API access
+
+Environment variables (optional):
+  MAX_RUNS           - Maximum number of workflow runs to analyze (default: 10)
+  WORKFLOW_NAME      - Workflow file name to query (default: ci-amd-arm.yml)
+  BRANCH             - Branch to filter runs (default: main)
+  OUTPUT_FILE        - Path for the Slack message output (default: 
slack-message.json)
+  GITHUB_OUTPUT      - Path to GitHub Actions output file
+  GITHUB_STEP_SUMMARY - Path to GitHub Actions step summary file
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import subprocess
+import sys
+import tempfile
+from collections import defaultdict
+from pathlib import Path
+
+BROWSERS = ["chromium", "firefox", "webkit"]
+# A test failing in >= this fraction of runs is considered a flaky candidate
+FLAKY_THRESHOLD = 0.3
+
+
+def escape_slack_mrkdwn(text: str) -> str:
+    """Escape special characters for Slack mrkdwn format."""
+    text = text.replace("&", "&amp;")
+    text = text.replace("<", "&lt;")
+    text = text.replace(">", "&gt;")
+    return text
+
+
+def gh_api(endpoint: str, **kwargs: str) -> str | None:
+    """Call GitHub API via gh CLI."""
+    cmd = ["gh", "api", endpoint]
+    for key, value in kwargs.items():
+        cmd.extend(["-f", f"{key}={value}"])
+    result = subprocess.run(cmd, capture_output=True, text=True, check=False)
+    if result.returncode != 0:
+        print(f"gh api error for {endpoint}: {result.stderr}", file=sys.stderr)
+        return None
+    return result.stdout.strip()
+
+
+def get_recent_run_ids(repo: str, workflow: str, branch: str, max_runs: int) 
-> list[dict]:
+    """Get recent workflow run IDs and metadata."""
+    output = gh_api(
+        f"repos/{repo}/actions/workflows/{workflow}/runs",
+        branch=branch,
+        per_page=str(max_runs),
+        status="completed",
+    )
+    if not output:
+        return []
+    try:
+        data = json.loads(output)
+    except json.JSONDecodeError:
+        return []
+
+    runs = []
+    for run in data.get("workflow_runs", []):
+        runs.append(
+            {
+                "id": run["id"],
+                "run_number": run.get("run_number", "?"),
+                "created_at": run.get("created_at", ""),
+                "conclusion": run.get("conclusion", "unknown"),
+                "html_url": run.get("html_url", ""),
+            }
+        )
+    return runs[:max_runs]
+
+
+def download_artifact(repo: str, run_id: int, artifact_name: str, dest_dir: 
Path) -> bool:
+    """Download a specific artifact from a workflow run."""
+    result = subprocess.run(
+        [
+            "gh",
+            "run",
+            "download",
+            str(run_id),
+            "--name",
+            artifact_name,
+            "--dir",
+            str(dest_dir),
+            "--repo",
+            repo,
+        ],
+        capture_output=True,
+        text=True,
+        check=False,
+    )
+    if result.returncode != 0:
+        # Artifact may not exist for this run (e.g. tests were skipped)
+        return False
+    return True
+
+
+def load_json_safe(path: Path) -> dict | None:
+    """Load JSON file, returning None on failure."""
+    if not path.exists():
+        return None
+    try:
+        return json.loads(path.read_text())
+    except json.JSONDecodeError:
+        return None
+
+
+def analyze_runs(repo: str, runs: list[dict]) -> tuple[dict[str, dict], 
dict[str, dict], int]:
+    """Download and analyze test reports from multiple runs.
+
+    Returns:
+        - failures_by_test: {test_name: {count, browsers, errors, runs}}
+        - fixme_tests: {test_name: {spec_file, annotations}}
+        - runs_with_data: number of runs that had downloadable data
+    """
+    failures_by_test: dict[str, dict] = defaultdict(
+        lambda: {"count": 0, "browsers": set(), "errors": [], "run_ids": []}
+    )
+    fixme_tests: dict[str, dict] = {}
+    runs_with_data = 0
+
+    with tempfile.TemporaryDirectory() as tmpdir:
+        for run in runs:
+            run_id = run["id"]
+            run_has_data = False
+
+            for browser in BROWSERS:
+                artifact_name = f"e2e-test-report-{browser}"
+                dest = Path(tmpdir) / str(run_id) / browser
+                dest.mkdir(parents=True, exist_ok=True)
+
+                if not download_artifact(repo, run_id, artifact_name, dest):
+                    continue
+
+                run_has_data = True
+
+                # Process failures
+                failures_data = load_json_safe(dest / "failures.json")
+                if failures_data:
+                    for failure in failures_data.get("failures", []):
+                        test_name = failure.get("test", "unknown")
+                        entry = failures_by_test[test_name]
+                        entry["count"] += 1
+                        entry["browsers"].add(browser)
+                        if failure.get("error") and len(entry["errors"]) < 3:
+                            entry["errors"].append(
+                                {
+                                    "error": failure["error"],
+                                    "browser": browser,
+                                    "run_id": run_id,
+                                }
+                            )
+                        if run_id not in entry["run_ids"]:
+                            entry["run_ids"].append(run_id)
+
+                # Process fixme tests (same across browsers, but collect once)
+                fixme_data = load_json_safe(dest / "fixme_tests.json")
+                if fixme_data:
+                    for fixme in fixme_data.get("fixme_tests", []):
+                        test_name = fixme.get("test", "unknown")
+                        if test_name not in fixme_tests:
+                            fixme_tests[test_name] = {
+                                "spec_file": fixme.get("spec_file", "unknown"),
+                                "annotations": fixme.get("annotations", []),
+                            }
+
+            if run_has_data:
+                runs_with_data += 1
+
+    # Convert sets to sorted lists for JSON serialization
+    for entry in failures_by_test.values():
+        entry["browsers"] = sorted(entry["browsers"])
+
+    return dict(failures_by_test), fixme_tests, runs_with_data
+
+
+def identify_flaky_candidates(failures_by_test: dict[str, dict], 
runs_with_data: int) -> list[dict]:
+    """Identify tests that are good candidates for marking as fixme."""
+    if runs_with_data == 0:
+        return []
+
+    candidates = []
+    for test_name, data in failures_by_test.items():
+        # Use runs where this test appeared rather than total runs * browsers,
+        # since not every test runs in every browser or every run.
+        runs_seen = len(data["run_ids"]) if data["run_ids"] else 1
+        failure_rate = data["count"] / (runs_seen * len(data["browsers"])) if 
data["browsers"] else 0
+        if failure_rate >= FLAKY_THRESHOLD:
+            candidates.append(
+                {
+                    "test": test_name,
+                    "failure_rate": round(failure_rate * 100, 1),
+                    "failure_count": data["count"],
+                    "browsers": data["browsers"],
+                    "errors": data["errors"][:2],
+                }
+            )
+    # Sort by failure rate descending
+    candidates.sort(key=lambda c: c["failure_rate"], reverse=True)
+    return candidates
+
+
+def format_slack_message(
+    failures_by_test: dict[str, dict],
+    fixme_tests: dict[str, dict],
+    flaky_candidates: list[dict],
+    runs: list[dict],
+    runs_with_data: int,
+    repo: str,
+) -> dict:
+    """Format the analysis results as a Slack Block Kit message."""
+    total_runs = len(runs)
+    total_unique_failures = len(failures_by_test)
+    total_fixme = len(fixme_tests)
+
+    blocks: list[dict] = []
+
+    # Header
+    blocks.append(
+        {
+            "type": "header",
+            "text": {
+                "type": "plain_text",
+                "text": "UI E2E Flaky Test Report",
+            },
+        }
+    )
+
+    # Summary section
+    summary_lines = [
+        f"*Runs analyzed:* {runs_with_data}/{total_runs}",
+        f"*Unique test failures:* {total_unique_failures}",
+        f"*Flaky candidates (>={int(FLAKY_THRESHOLD * 100)}% failure rate):* 
{len(flaky_candidates)}",
+        f"*Tests marked fixme:* {total_fixme}",
+    ]
+    blocks.append(
+        {
+            "type": "section",
+            "text": {"type": "mrkdwn", "text": "\n".join(summary_lines)},
+        }
+    )
+    blocks.append({"type": "divider"})
+
+    # Flaky candidates section
+    if flaky_candidates:
+        blocks.append(
+            {
+                "type": "section",
+                "text": {
+                    "type": "mrkdwn",
+                    "text": "*Flaky Test Candidates* (consider marking with 
`test.fixme`):",
+                },
+            }
+        )
+        for candidate in flaky_candidates[:10]:
+            error_summary = ""
+            if candidate["errors"]:
+                first_error = 
escape_slack_mrkdwn(candidate["errors"][0]["error"])
+                # Truncate for Slack display
+                if len(first_error) > 150:
+                    first_error = first_error[:150] + "..."
+                error_summary = f"\n>  _Error: {first_error}_"
+
+            test_name = escape_slack_mrkdwn(candidate["test"])
+            browsers_str = ", ".join(candidate["browsers"])
+            blocks.append(
+                {
+                    "type": "section",
+                    "text": {
+                        "type": "mrkdwn",
+                        "text": (
+                            f"*{test_name}*\n"
+                            f"  Failure rate: *{candidate['failure_rate']}%* "
+                            f"({candidate['failure_count']} failures) | "
+                            f"Browsers: {browsers_str}"
+                            f"{error_summary}"
+                        ),
+                    },
+                }
+            )
+        if len(flaky_candidates) > 10:
+            blocks.append(
+                {
+                    "type": "context",
+                    "elements": [
+                        {
+                            "type": "mrkdwn",
+                            "text": f"_...and {len(flaky_candidates) - 10} 
more flaky candidates_",
+                        }
+                    ],
+                }
+            )
+    else:
+        blocks.append(
+            {
+                "type": "section",
+                "text": {
+                    "type": "mrkdwn",
+                    "text": "*No flaky test candidates detected* — all tests 
are stable or already marked.",
+                },
+            }
+        )
+
+    blocks.append({"type": "divider"})
+
+    # All failures table
+    if failures_by_test:
+        blocks.append(
+            {
+                "type": "section",
+                "text": {
+                    "type": "mrkdwn",
+                    "text": "*All Test Failures* (across last runs):",
+                },
+            }
+        )
+        # Sort by count descending
+        sorted_failures = sorted(failures_by_test.items(), key=lambda x: 
x[1]["count"], reverse=True)
+        failure_lines = []
+        for test_name, data in sorted_failures[:15]:
+            escaped_name = escape_slack_mrkdwn(test_name)
+            browsers_str = ", ".join(data["browsers"])
+            error_hint = ""
+            if data["errors"]:
+                short_err = escape_slack_mrkdwn(data["errors"][0]["error"])
+                if len(short_err) > 100:
+                    short_err = short_err[:100] + "..."
+                error_hint = f"\n>  _{short_err}_"
+            failure_lines.append(
+                f"• *{escaped_name}* — {data['count']}x failures | 
{browsers_str}{error_hint}"
+            )
+        # Slack has a 3000 char limit per text block; split if needed
+        text = "\n".join(failure_lines)
+        if len(text) > 2900:
+            text = text[:2900] + "\n_...truncated_"
+        blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": 
text}})
+        if len(sorted_failures) > 15:
+            blocks.append(
+                {
+                    "type": "context",
+                    "elements": [
+                        {
+                            "type": "mrkdwn",
+                            "text": f"_...and {len(sorted_failures) - 15} more 
failing tests_",
+                        }
+                    ],
+                }
+            )
+    else:
+        blocks.append(
+            {
+                "type": "section",
+                "text": {
+                    "type": "mrkdwn",
+                    "text": "*No test failures detected* in the analyzed 
runs.",
+                },
+            }
+        )
+
+    blocks.append({"type": "divider"})
+
+    # Fixme tests section
+    if fixme_tests:
+        blocks.append(
+            {
+                "type": "section",
+                "text": {
+                    "type": "mrkdwn",
+                    "text": "*Tests Marked `fixme`* (waiting for fixes):",
+                },
+            }
+        )
+        fixme_lines = []
+        for test_name, data in sorted(fixme_tests.items()):
+            escaped_name = escape_slack_mrkdwn(test_name)
+            spec = escape_slack_mrkdwn(data.get("spec_file", "unknown"))
+            fixme_lines.append(f"• {escaped_name} (`{spec}`)")
+        text = "\n".join(fixme_lines)
+        if len(text) > 2900:
+            text = text[:2900] + "\n_...truncated_"
+        blocks.append({"type": "section", "text": {"type": "mrkdwn", "text": 
text}})
+    else:
+        blocks.append(
+            {
+                "type": "section",
+                "text": {
+                    "type": "mrkdwn",
+                    "text": "*No tests are currently marked `fixme`.*",
+                },
+            }
+        )
+
+    # Footer with link
+    if runs:
+        latest_run = runs[0]
+        blocks.append(
+            {
+                "type": "context",
+                "elements": [
+                    {
+                        "type": "mrkdwn",
+                        "text": (
+                            f"<https://github.com/{repo}/actions|View all 
workflow runs> | "
+                            f"Latest: <{latest_run['html_url']}|Run 
#{latest_run['run_number']}>"
+                        ),
+                    }
+                ],
+            }
+        )
+
+    # Build the plain text fallback
+    fallback = (
+        f"UI E2E Flaky Test Report: "
+        f"{total_unique_failures} unique failures, "
+        f"{len(flaky_candidates)} flaky candidates, "
+        f"{total_fixme} fixme tests "
+        f"(from {runs_with_data} runs)"
+    )
+
+    return {
+        "channel": "airflow-3-ui",
+        "text": fallback,
+        "blocks": blocks,
+    }
+
+
+def write_step_summary(
+    failures_by_test: dict[str, dict],
+    fixme_tests: dict[str, dict],
+    flaky_candidates: list[dict],
+    runs_with_data: int,
+    total_runs: int,
+) -> None:
+    """Write a GitHub Actions step summary in markdown."""
+    summary_path = os.environ.get("GITHUB_STEP_SUMMARY")
+    if not summary_path:
+        return
+
+    lines = [
+        "## UI E2E Flaky Test Report",
+        "",
+        "| Metric | Value |",
+        "|--------|-------|",
+        f"| Runs analyzed | {runs_with_data}/{total_runs} |",
+        f"| Unique test failures | {len(failures_by_test)} |",
+        f"| Flaky candidates | {len(flaky_candidates)} |",
+        f"| Tests marked fixme | {len(fixme_tests)} |",
+        "",
+    ]
+
+    if flaky_candidates:
+        lines.append("### Flaky Candidates")
+        lines.append("")
+        lines.append("| Test | Failure Rate | Count | Browsers |")
+        lines.append("|------|-------------|-------|----------|")
+        for c in flaky_candidates[:20]:
+            browsers = ", ".join(c["browsers"])
+            lines.append(f"| {c['test']} | {c['failure_rate']}% | 
{c['failure_count']} | {browsers} |")
+        lines.append("")
+
+    if failures_by_test:
+        lines.append("### All Failures")
+        lines.append("")
+        lines.append("| Test | Failures | Browsers | Error |")
+        lines.append("|------|----------|----------|-------|")
+        sorted_failures = sorted(failures_by_test.items(), key=lambda x: 
x[1]["count"], reverse=True)
+        for test_name, data in sorted_failures[:20]:
+            browsers = ", ".join(data["browsers"])
+            error = data["errors"][0]["error"][:100] + "..." if data["errors"] 
else ""
+            error = error.replace("|", "\\|")
+            lines.append(f"| {test_name} | {data['count']} | {browsers} | 
{error} |")
+        lines.append("")
+
+    if fixme_tests:
+        lines.append("### Fixme Tests")
+        lines.append("")
+        for test_name, data in sorted(fixme_tests.items()):
+            lines.append(f"- {test_name} (`{data.get('spec_file', 
'unknown')}`)")
+        lines.append("")
+
+    with open(summary_path, "a") as f:
+        f.write("\n".join(lines))
+
+
+def main() -> None:
+    repo = os.environ.get("GITHUB_REPOSITORY", "apache/airflow")
+    max_runs = int(os.environ.get("MAX_RUNS", "10"))
+    workflow = os.environ.get("WORKFLOW_NAME", "ci-amd-arm.yml")
+    branch = os.environ.get("BRANCH", "main")
+    output_file = Path(os.environ.get("OUTPUT_FILE", "slack-message.json"))
+
+    print(f"Analyzing E2E test results for {repo} ({workflow} on {branch})")
+    print(f"Looking at up to {max_runs} recent runs...")
+
+    # Get recent runs
+    runs = get_recent_run_ids(repo, workflow, branch, max_runs)
+    if not runs:
+        print("No workflow runs found.")
+        sys.exit(0)
+
+    print(f"Found {len(runs)} runs to analyze.")
+
+    # Download and analyze
+    failures_by_test, fixme_tests, runs_with_data = analyze_runs(repo, runs)
+
+    print(f"Runs with data: {runs_with_data}/{len(runs)}")
+    print(f"Unique failing tests: {len(failures_by_test)}")
+    print(f"Fixme tests: {len(fixme_tests)}")
+
+    # Identify flaky candidates
+    flaky_candidates = identify_flaky_candidates(failures_by_test, 
runs_with_data)
+    print(f"Flaky candidates: {len(flaky_candidates)}")
+
+    # Format Slack message
+    slack_message = format_slack_message(
+        failures_by_test, fixme_tests, flaky_candidates, runs, runs_with_data, 
repo
+    )
+    output_file.write_text(json.dumps(slack_message, indent=2))
+    print(f"Slack message written to: {output_file}")
+
+    # Write step summary
+    write_step_summary(failures_by_test, fixme_tests, flaky_candidates, 
runs_with_data, len(runs))
+
+    # Set outputs for GitHub Actions
+    github_output = os.environ.get("GITHUB_OUTPUT")
+    if github_output:
+        has_failures = len(failures_by_test) > 0
+        has_flaky = len(flaky_candidates) > 0
+        with open(github_output, "a") as f:
+            f.write(f"has-failures={str(has_failures).lower()}\n")
+            f.write(f"has-flaky-candidates={str(has_flaky).lower()}\n")
+            f.write(f"unique-failures={len(failures_by_test)}\n")
+            f.write(f"flaky-candidates={len(flaky_candidates)}\n")
+            f.write(f"fixme-tests={len(fixme_tests)}\n")
+            f.write(f"runs-analyzed={runs_with_data}\n")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/scripts/ci/extract_e2e_test_results.py 
b/scripts/ci/extract_e2e_test_results.py
new file mode 100755
index 00000000000..b9ccd42f434
--- /dev/null
+++ b/scripts/ci/extract_e2e_test_results.py
@@ -0,0 +1,203 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# /// script
+# requires-python = ">=3.10"
+# ///
+"""
+Extract E2E test results from Playwright JSON report.
+
+Reads the Playwright JSON results file and produces two output files:
+  - failures.json: list of failed tests with short error summaries
+  - fixme_tests.json: list of tests marked with test.fixme and their status
+
+Environment variables:
+  RESULTS_JSON   - Path to Playwright results.json (default: 
test-results/results.json)
+  OUTPUT_DIR     - Directory for output files (default: e2e-test-report)
+  BROWSER        - Browser name to include in output (default: unknown)
+  RUN_ID         - GitHub Actions run ID (default: unknown)
+  RUN_ATTEMPT    - GitHub Actions run attempt (default: 1)
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import sys
+from pathlib import Path
+
+MAX_ERROR_LENGTH = 300
+
+
+def truncate_error(message: str) -> str:
+    """Truncate error message to a reasonable length for display."""
+    message = message.strip().replace("\n", " ").replace("\r", "")
+    if len(message) > MAX_ERROR_LENGTH:
+        return message[:MAX_ERROR_LENGTH] + "..."
+    return message
+
+
+def extract_test_title(spec: dict) -> str:
+    """Build full test title from spec file and test name."""
+    spec_file = spec.get("location", {}).get("file", "unknown")
+    title_parts = spec.get("titlePath", [])
+    # titlePath is typically ["", "spec description", "test name"]
+    # Filter empty parts and join
+    title = " > ".join(part for part in title_parts if part)
+    return f"{spec_file}: {title}" if title else spec_file
+
+
+def extract_failures(suites: list[dict]) -> list[dict]:
+    """Extract failed tests from Playwright JSON suites."""
+    failures = []
+    for suite in suites:
+        failures.extend(_extract_failures_from_suite(suite))
+    return failures
+
+
+def _extract_failures_from_suite(suite: dict) -> list[dict]:
+    """Recursively extract failures from a suite and its children."""
+    failures = []
+    for spec in suite.get("specs", []):
+        for test in spec.get("tests", []):
+            if test.get("status") == "expected":
+                continue
+            # Check if any result actually failed
+            for result in test.get("results", []):
+                if result.get("status") in ("failed", "timedOut"):
+                    error_message = ""
+                    if result.get("error", {}).get("message"):
+                        error_message = 
truncate_error(result["error"]["message"])
+                    elif result.get("errors"):
+                        messages = [e.get("message", "") for e in 
result["errors"] if e.get("message")]
+                        error_message = truncate_error("; ".join(messages))
+
+                    failures.append(
+                        {
+                            "test": extract_test_title(spec),
+                            "status": result.get("status", "failed"),
+                            "error": error_message,
+                            "spec_file": spec.get("file", suite.get("file", 
"unknown")),
+                            "retry": result.get("retry", 0),
+                        }
+                    )
+                    break  # Only report first failure per test
+
+    for child_suite in suite.get("suites", []):
+        failures.extend(_extract_failures_from_suite(child_suite))
+    return failures
+
+
+def extract_fixme_tests(suites: list[dict]) -> list[dict]:
+    """Extract tests marked with test.fixme from Playwright JSON suites."""
+    fixme_tests = []
+    for suite in suites:
+        fixme_tests.extend(_extract_fixme_from_suite(suite))
+    return fixme_tests
+
+
+def _extract_fixme_from_suite(suite: dict) -> list[dict]:
+    """Recursively extract fixme tests from a suite and its children."""
+    fixme_tests = []
+    for spec in suite.get("specs", []):
+        for test in spec.get("tests", []):
+            # In Playwright JSON, fixme tests have status "skipped" and
+            # annotations with type "fixme"
+            annotations = test.get("annotations", [])
+            is_fixme = any(a.get("type") == "fixme" for a in annotations)
+            if is_fixme:
+                fixme_tests.append(
+                    {
+                        "test": extract_test_title(spec),
+                        "status": test.get("status", "skipped"),
+                        "spec_file": spec.get("file", suite.get("file", 
"unknown")),
+                        "annotations": [
+                            {"type": a.get("type", ""), "description": 
a.get("description", "")}
+                            for a in annotations
+                        ],
+                    }
+                )
+
+    for child_suite in suite.get("suites", []):
+        fixme_tests.extend(_extract_fixme_from_suite(child_suite))
+    return fixme_tests
+
+
+def main() -> None:
+    results_path = Path(os.environ.get("RESULTS_JSON", 
"test-results/results.json"))
+    output_dir = Path(os.environ.get("OUTPUT_DIR", "e2e-test-report"))
+    browser = os.environ.get("BROWSER", "unknown")
+    run_id = os.environ.get("RUN_ID", "unknown")
+    run_attempt = os.environ.get("RUN_ATTEMPT", "1")
+
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    if not results_path.exists():
+        print(f"Results file not found: {results_path}")
+        print("Writing empty results (no test data available).")
+        metadata = {
+            "browser": browser,
+            "run_id": run_id,
+            "run_attempt": run_attempt,
+            "has_results": False,
+        }
+        (output_dir / "failures.json").write_text(
+            json.dumps({"metadata": metadata, "failures": []}, indent=2)
+        )
+        (output_dir / "fixme_tests.json").write_text(
+            json.dumps({"metadata": metadata, "fixme_tests": []}, indent=2)
+        )
+        return
+
+    try:
+        report = json.loads(results_path.read_text())
+    except json.JSONDecodeError as e:
+        print(f"ERROR: Invalid JSON in {results_path}: {e}", file=sys.stderr)
+        sys.exit(1)
+
+    suites = report.get("suites", [])
+    failures = extract_failures(suites)
+    fixme_tests = extract_fixme_tests(suites)
+
+    metadata = {
+        "browser": browser,
+        "run_id": run_id,
+        "run_attempt": run_attempt,
+        "has_results": True,
+        "total_tests": report.get("stats", {}).get("expected", 0)
+        + report.get("stats", {}).get("unexpected", 0)
+        + report.get("stats", {}).get("skipped", 0),
+        "total_passed": report.get("stats", {}).get("expected", 0),
+        "total_failed": report.get("stats", {}).get("unexpected", 0),
+        "total_skipped": report.get("stats", {}).get("skipped", 0),
+    }
+
+    failures_output = {"metadata": metadata, "failures": failures}
+    fixme_output = {"metadata": metadata, "fixme_tests": fixme_tests}
+
+    (output_dir / "failures.json").write_text(json.dumps(failures_output, 
indent=2))
+    (output_dir / "fixme_tests.json").write_text(json.dumps(fixme_output, 
indent=2))
+
+    print(f"Browser: {browser}")
+    print(f"Total tests: {metadata['total_tests']}")
+    print(f"Failures: {len(failures)}")
+    print(f"Fixme tests: {len(fixme_tests)}")
+    print(f"Output written to: {output_dir}")
+
+
+if __name__ == "__main__":
+    main()
diff --git a/scripts/tests/ci/test_analyze_e2e_flaky_tests.py 
b/scripts/tests/ci/test_analyze_e2e_flaky_tests.py
new file mode 100644
index 00000000000..ff7097c7aee
--- /dev/null
+++ b/scripts/tests/ci/test_analyze_e2e_flaky_tests.py
@@ -0,0 +1,176 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib.util
+import sys
+from pathlib import Path
+
+import pytest
+
+MODULE_PATH = Path(__file__).resolve().parents[3] / "scripts" / "ci" / 
"analyze_e2e_flaky_tests.py"
+
+
[email protected]
+def analyze_module():
+    module_name = "test_analyze_e2e_module"
+    sys.modules.pop(module_name, None)
+    spec = importlib.util.spec_from_file_location(module_name, MODULE_PATH)
+    assert spec is not None
+    assert spec.loader is not None
+    module = importlib.util.module_from_spec(spec)
+    spec.loader.exec_module(module)
+    return module
+
+
+class TestEscapeSlackMrkdwn:
+    def test_escapes_ampersand(self, analyze_module):
+        assert analyze_module.escape_slack_mrkdwn("a & b") == "a &amp; b"
+
+    def test_escapes_angle_brackets(self, analyze_module):
+        assert analyze_module.escape_slack_mrkdwn("<div>") == "&lt;div&gt;"
+
+    def test_plain_text_unchanged(self, analyze_module):
+        assert analyze_module.escape_slack_mrkdwn("hello world") == "hello 
world"
+
+    def test_all_special_chars(self, analyze_module):
+        result = analyze_module.escape_slack_mrkdwn("a < b & c > d")
+        assert result == "a &lt; b &amp; c &gt; d"
+
+
+class TestIdentifyFlakyCandidates:
+    def test_empty_failures(self, analyze_module):
+        assert analyze_module.identify_flaky_candidates({}, 5) == []
+
+    def test_zero_runs(self, analyze_module):
+        assert analyze_module.identify_flaky_candidates({"test": {}}, 0) == []
+
+    def test_identifies_flaky_test(self, analyze_module):
+        failures = {
+            "test_a": {
+                "count": 5,
+                "browsers": ["chromium", "firefox"],
+                "errors": [{"error": "timeout", "browser": "chromium", 
"run_id": 1}],
+                "run_ids": [1, 2, 3],
+            },
+        }
+        candidates = analyze_module.identify_flaky_candidates(failures, 5)
+        assert len(candidates) == 1
+        assert candidates[0]["test"] == "test_a"
+        assert candidates[0]["failure_rate"] > 0
+
+    def test_excludes_low_failure_rate(self, analyze_module):
+        failures = {
+            "test_a": {
+                "count": 1,
+                "browsers": ["chromium", "firefox", "webkit"],
+                "errors": [],
+                "run_ids": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
+            },
+        }
+        candidates = analyze_module.identify_flaky_candidates(failures, 10)
+        assert len(candidates) == 0
+
+    def test_sorted_by_failure_rate(self, analyze_module):
+        failures = {
+            "test_low": {
+                "count": 3,
+                "browsers": ["chromium"],
+                "errors": [],
+                "run_ids": [1, 2, 3],
+            },
+            "test_high": {
+                "count": 10,
+                "browsers": ["chromium"],
+                "errors": [],
+                "run_ids": [1, 2, 3],
+            },
+        }
+        candidates = analyze_module.identify_flaky_candidates(failures, 10)
+        if len(candidates) >= 2:
+            assert candidates[0]["failure_rate"] >= 
candidates[1]["failure_rate"]
+
+
+class TestFormatSlackMessage:
+    def test_empty_results(self, analyze_module):
+        msg = analyze_module.format_slack_message(
+            failures_by_test={},
+            fixme_tests={},
+            flaky_candidates=[],
+            runs=[],
+            runs_with_data=0,
+            repo="apache/airflow",
+        )
+        assert msg["channel"] == "airflow-3-ui"
+        assert "text" in msg
+        assert "blocks" in msg
+        assert any("No flaky test candidates" in str(b) for b in msg["blocks"])
+        assert any("No test failures" in str(b) for b in msg["blocks"])
+
+    def test_with_failures_and_fixme(self, analyze_module):
+        failures = {
+            "test_a": {
+                "count": 3,
+                "browsers": ["chromium"],
+                "errors": [{"error": "timeout error", "browser": "chromium", 
"run_id": 1}],
+                "run_ids": [1, 2],
+            },
+        }
+        fixme = {
+            "test_b": {"spec_file": "test.spec.ts", "annotations": [{"type": 
"fixme"}]},
+        }
+        candidates = [
+            {
+                "test": "test_a",
+                "failure_rate": 50.0,
+                "failure_count": 3,
+                "browsers": ["chromium"],
+                "errors": [{"error": "timeout error"}],
+            }
+        ]
+        runs = [{"id": 1, "run_number": 100, "html_url": 
"https://example.com"}]
+        msg = analyze_module.format_slack_message(failures, fixme, candidates, 
runs, 1, "apache/airflow")
+        assert msg["channel"] == "airflow-3-ui"
+        blocks_text = str(msg["blocks"])
+        assert "test_a" in blocks_text
+        assert "test_b" in blocks_text
+        assert "50.0%" in blocks_text
+
+    def test_special_chars_in_test_names(self, analyze_module):
+        failures = {
+            "test <with> & special": {
+                "count": 3,
+                "browsers": ["chromium"],
+                "errors": [{"error": "err <msg>", "browser": "chromium", 
"run_id": 1}],
+                "run_ids": [1],
+            },
+        }
+        candidates = [
+            {
+                "test": "test <with> & special",
+                "failure_rate": 100.0,
+                "failure_count": 3,
+                "browsers": ["chromium"],
+                "errors": [{"error": "err <msg>"}],
+            }
+        ]
+        msg = analyze_module.format_slack_message(failures, {}, candidates, 
[], 1, "apache/airflow")
+        blocks_text = str(msg["blocks"])
+        # Angle brackets and ampersands should be escaped
+        assert "&lt;" in blocks_text
+        assert "&gt;" in blocks_text
+        assert "&amp;" in blocks_text
diff --git a/scripts/tests/ci/test_extract_e2e_test_results.py 
b/scripts/tests/ci/test_extract_e2e_test_results.py
new file mode 100644
index 00000000000..35a50d1f51d
--- /dev/null
+++ b/scripts/tests/ci/test_extract_e2e_test_results.py
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from __future__ import annotations
+
+import importlib.util
+import json
+import sys
+from pathlib import Path
+
+import pytest
+
+MODULE_PATH = Path(__file__).resolve().parents[3] / "scripts" / "ci" / 
"extract_e2e_test_results.py"
+
+
[email protected]
+def extract_module():
+    module_name = "test_extract_e2e_module"
+    sys.modules.pop(module_name, None)
+    spec = importlib.util.spec_from_file_location(module_name, MODULE_PATH)
+    assert spec is not None
+    assert spec.loader is not None
+    module = importlib.util.module_from_spec(spec)
+    spec.loader.exec_module(module)
+    return module
+
+
+SAMPLE_PLAYWRIGHT_REPORT = {
+    "stats": {"expected": 5, "unexpected": 2, "skipped": 1},
+    "suites": [
+        {
+            "title": "Test Suite",
+            "file": "tests/e2e/specs/home.spec.ts",
+            "specs": [
+                {
+                    "title": "should load home page",
+                    "file": "tests/e2e/specs/home.spec.ts",
+                    "location": {"file": "tests/e2e/specs/home.spec.ts"},
+                    "titlePath": ["", "Home", "should load home page"],
+                    "tests": [
+                        {
+                            "status": "unexpected",
+                            "annotations": [],
+                            "results": [
+                                {
+                                    "status": "failed",
+                                    "retry": 0,
+                                    "error": {"message": "Expected visible but 
element was hidden"},
+                                }
+                            ],
+                        }
+                    ],
+                },
+                {
+                    "title": "should display dashboard",
+                    "file": "tests/e2e/specs/home.spec.ts",
+                    "location": {"file": "tests/e2e/specs/home.spec.ts"},
+                    "titlePath": ["", "Home", "should display dashboard"],
+                    "tests": [
+                        {
+                            "status": "expected",
+                            "annotations": [],
+                            "results": [{"status": "passed", "retry": 0}],
+                        }
+                    ],
+                },
+            ],
+            "suites": [
+                {
+                    "title": "Nested Suite",
+                    "file": "tests/e2e/specs/home.spec.ts",
+                    "specs": [
+                        {
+                            "title": "should handle fixme test",
+                            "file": "tests/e2e/specs/home.spec.ts",
+                            "location": {"file": 
"tests/e2e/specs/home.spec.ts"},
+                            "titlePath": ["", "Home", "Nested Suite", "should 
handle fixme test"],
+                            "tests": [
+                                {
+                                    "status": "skipped",
+                                    "annotations": [{"type": "fixme", 
"description": ""}],
+                                    "results": [],
+                                }
+                            ],
+                        }
+                    ],
+                    "suites": [],
+                }
+            ],
+        }
+    ],
+}
+
+
+class TestTruncateError:
+    def test_short_message_unchanged(self, extract_module):
+        assert extract_module.truncate_error("short error") == "short error"
+
+    def test_long_message_truncated(self, extract_module):
+        long_msg = "x" * 400
+        result = extract_module.truncate_error(long_msg)
+        assert len(result) == 303  # 300 + "..."
+        assert result.endswith("...")
+
+    def test_strips_whitespace_and_newlines(self, extract_module):
+        assert extract_module.truncate_error("  error\nmessage\r\n  ") == 
"error message"
+
+
+class TestExtractTestTitle:
+    def test_with_title_path(self, extract_module):
+        spec = {
+            "location": {"file": "test.spec.ts"},
+            "titlePath": ["", "Suite", "test name"],
+        }
+        assert extract_module.extract_test_title(spec) == "test.spec.ts: Suite 
> test name"
+
+    def test_without_title_path(self, extract_module):
+        spec = {"location": {"file": "test.spec.ts"}, "titlePath": []}
+        assert extract_module.extract_test_title(spec) == "test.spec.ts"
+
+    def test_missing_location(self, extract_module):
+        spec = {"titlePath": ["", "Suite", "test"]}
+        assert extract_module.extract_test_title(spec) == "unknown: Suite > 
test"
+
+
+class TestExtractFailures:
+    def test_extracts_failed_tests(self, extract_module):
+        failures = 
extract_module.extract_failures(SAMPLE_PLAYWRIGHT_REPORT["suites"])
+        assert len(failures) == 1
+        assert failures[0]["status"] == "failed"
+        assert "visible" in failures[0]["error"]
+        assert failures[0]["spec_file"] == "tests/e2e/specs/home.spec.ts"
+
+    def test_skips_passing_tests(self, extract_module):
+        failures = 
extract_module.extract_failures(SAMPLE_PLAYWRIGHT_REPORT["suites"])
+        test_names = [f["test"] for f in failures]
+        assert not any("dashboard" in name for name in test_names)
+
+    def test_empty_suites(self, extract_module):
+        assert extract_module.extract_failures([]) == []
+
+
+class TestExtractFixmeTests:
+    def test_extracts_fixme_tests(self, extract_module):
+        fixme = 
extract_module.extract_fixme_tests(SAMPLE_PLAYWRIGHT_REPORT["suites"])
+        assert len(fixme) == 1
+        assert "fixme" in fixme[0]["test"]
+        assert fixme[0]["status"] == "skipped"
+        assert any(a["type"] == "fixme" for a in fixme[0]["annotations"])
+
+    def test_empty_suites(self, extract_module):
+        assert extract_module.extract_fixme_tests([]) == []
+
+
+class TestMainFunction:
+    def test_missing_results_file_writes_empty(self, extract_module, tmp_path, 
monkeypatch):
+        output_dir = tmp_path / "output"
+        monkeypatch.setenv("RESULTS_JSON", str(tmp_path / "nonexistent.json"))
+        monkeypatch.setenv("OUTPUT_DIR", str(output_dir))
+        monkeypatch.setenv("BROWSER", "chromium")
+        monkeypatch.setenv("RUN_ID", "12345")
+
+        extract_module.main()
+
+        failures = json.loads((output_dir / "failures.json").read_text())
+        assert failures["failures"] == []
+        assert failures["metadata"]["has_results"] is False
+
+        fixme = json.loads((output_dir / "fixme_tests.json").read_text())
+        assert fixme["fixme_tests"] == []
+
+    def test_valid_results_file(self, extract_module, tmp_path, monkeypatch):
+        results_file = tmp_path / "results.json"
+        results_file.write_text(json.dumps(SAMPLE_PLAYWRIGHT_REPORT))
+        output_dir = tmp_path / "output"
+
+        monkeypatch.setenv("RESULTS_JSON", str(results_file))
+        monkeypatch.setenv("OUTPUT_DIR", str(output_dir))
+        monkeypatch.setenv("BROWSER", "firefox")
+        monkeypatch.setenv("RUN_ID", "99999")
+
+        extract_module.main()
+
+        failures = json.loads((output_dir / "failures.json").read_text())
+        assert len(failures["failures"]) == 1
+        assert failures["metadata"]["browser"] == "firefox"
+        assert failures["metadata"]["has_results"] is True
+
+        fixme = json.loads((output_dir / "fixme_tests.json").read_text())
+        assert len(fixme["fixme_tests"]) == 1

Reply via email to