This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 21bcabda2 [github] Add Unstable Test Reporter for CI to track flaky
tests
21bcabda2 is described below
commit 21bcabda208fd2ecad01d2b45a34e91d91fe6a1b
Author: Jark Wu <[email protected]>
AuthorDate: Sat Mar 7 18:24:48 2026 +0800
[github] Add Unstable Test Reporter for CI to track flaky tests
---
.github/workflows/unstable-test-reporter.yaml | 51 +++
tools/ci/unstable_test_reporter.py | 600 ++++++++++++++++++++++++++
2 files changed, 651 insertions(+)
diff --git a/.github/workflows/unstable-test-reporter.yaml
b/.github/workflows/unstable-test-reporter.yaml
new file mode 100644
index 000000000..169b62889
--- /dev/null
+++ b/.github/workflows/unstable-test-reporter.yaml
@@ -0,0 +1,51 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+name: Unstable Test Reporter
+on:
+ schedule:
+ # Run at UTC 04:00 daily (after Nightly at UTC 20:00 + buffer for
completion)
+ - cron: "0 4 * * *"
+ workflow_dispatch:
+ inputs:
+ lookback_hours:
+ description: "Hours to look back for failed runs"
+ required: false
+ default: "28"
+
+permissions:
+ issues: write
+ actions: read
+
+jobs:
+ report:
+ name: "Report Unstable Tests"
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v6
+ - name: Set up Python
+ uses: actions/setup-python@v5
+ with:
+ python-version: "3.11"
+ - name: Install dependencies
+ run: pip install requests
+ - name: Run unstable test reporter
+ run: python tools/ci/unstable_test_reporter.py
+ env:
+ GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ LOOKBACK_HOURS: ${{ inputs.lookback_hours || '28' }}
diff --git a/tools/ci/unstable_test_reporter.py
b/tools/ci/unstable_test_reporter.py
new file mode 100644
index 000000000..410f7bf6b
--- /dev/null
+++ b/tools/ci/unstable_test_reporter.py
@@ -0,0 +1,600 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+Unstable Test Reporter for Apache Fluss CI.
+
+Analyzes failed GitHub Actions workflow runs on the main branch,
+parses Maven surefire test failure logs, and creates or updates
+GitHub issues for unstable (flaky) tests.
+"""
+
+from __future__ import annotations
+
+import logging
+import os
+import re
+import sys
+import time
+from dataclasses import dataclass
+from datetime import datetime, timedelta, timezone
+from typing import List, Optional, Tuple
+from urllib.request import Request, urlopen
+import urllib.request as urllib_request
+from urllib.error import HTTPError, URLError
+from urllib.parse import quote
+import json
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s %(levelname)s %(message)s",
+)
+log = logging.getLogger(__name__)
+
+# ---------------------------------------------------------------------------
+# Constants
+# ---------------------------------------------------------------------------
+
+REPO_OWNER = "apache"
+REPO_NAME = "fluss"
+GITHUB_API = "https://api.github.com"
+WORKFLOW_FILES = ["ci.yaml", "nightly.yaml"]
+ISSUE_TITLE_PREFIX = "[test] Unstable test "
+MAX_STACK_TRACE_LINES = 30
+DEFAULT_LOOKBACK_HOURS = 28
+MAX_NEW_ISSUES = 5
+
+# Regex patterns
+TIMESTAMP_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z (.*)$")
+ANSI_RE = re.compile(r"\x1b\[[0-9;]*m")
+
+# Maven surefire individual test failure line, e.g.:
+# [ERROR] org.apache.fluss...Class.method(Type)[1] Time elapsed: 1.23 s <<<
FAILURE!
+# or: Error: org.apache.fluss...Class.method Time elapsed: 1.23 s <<<
FAILURE!
+TEST_FAILURE_RE = re.compile(
+
r"^(?:\[ERROR\]|Error:)\s+(\S+)\.(\w+)(?:\(.*?\))?(?:\[.*?\])?\s+.*<<<\s+(FAILURE|ERROR)!"
+)
+
+# Stack trace continuation patterns
+STACK_LINE_RE = re.compile(
+ r"^(?:\tat |Caused by: |\t\.\.\. \d+ more|\tSuppressed:
|[\w.$]+(?:Exception|Error|Throwable|Failure))"
+)
+
+
+# ---------------------------------------------------------------------------
+# Data structures
+# ---------------------------------------------------------------------------
+
+
+@dataclass
+class TestFailure:
+ class_name: str # fully qualified, e.g. org.apache.fluss...ClassName
+ method_name: str # e.g. testSomething
+ simple_class_name: str # e.g. ClassName
+ error_message: str # header line + stack trace
+ job_url: str
+ run_id: int
+ job_id: int
+
+
+# ---------------------------------------------------------------------------
+# GitHub API helpers
+# ---------------------------------------------------------------------------
+
+
+class GitHubAPI:
+ """Thin wrapper around GitHub REST API using urllib."""
+
+ def __init__(self, token: str):
+ self.token = token
+ self.base = GITHUB_API
+ self._search_last_call = 0.0 # rate-limit search API
+
+ def _request(
+ self, method: str, url: str, body: Optional[dict] = None, max_retries:
int = 3
+ ) -> dict | bytes | list:
+ headers = {
+ "Authorization": f"token {self.token}",
+ "Accept": "application/vnd.github+json",
+ "X-GitHub-Api-Version": "2022-11-28",
+ }
+ data = None
+ if body is not None:
+ headers["Content-Type"] = "application/json"
+ data = json.dumps(body).encode()
+
+ for attempt in range(max_retries):
+ try:
+ req = Request(url, data=data, headers=headers, method=method)
+ with urlopen(req, timeout=60) as resp:
+ raw = resp.read()
+ content_type = resp.headers.get("Content-Type", "")
+ if "application/json" in content_type:
+ return json.loads(raw)
+ return raw
+ except HTTPError as e:
+ if e.code == 403:
+ # Rate limit or permission error
+ retry_after = e.headers.get("Retry-After")
+ rate_remaining = e.headers.get("X-RateLimit-Remaining")
+ if retry_after or rate_remaining == "0":
+ wait = int(retry_after) if retry_after else 60
+ log.warning("Rate limited. Waiting %d seconds...",
wait)
+ time.sleep(wait)
+ continue
+ body_text = e.read().decode(errors="replace")
+ log.error("Permission error (403): %s", body_text)
+ raise
+ elif e.code == 404:
+ raise
+ elif e.code == 422:
+ body_text = e.read().decode(errors="replace")
+ log.error("Unprocessable entity (422): %s", body_text)
+ raise
+ elif e.code >= 500:
+ if attempt < max_retries - 1:
+ wait = 2 ** (attempt + 1)
+ log.warning(
+ "Server error %d, retrying in %ds...", e.code, wait
+ )
+ time.sleep(wait)
+ continue
+ raise
+ else:
+ raise
+ except URLError:
+ if attempt < max_retries - 1:
+ wait = 2 ** (attempt + 1)
+ log.warning("Network error, retrying in %ds...", wait)
+ time.sleep(wait)
+ continue
+ raise
+ return {}
+
+ def get(self, path: str) -> dict | list:
+ url = f"{self.base}{path}" if path.startswith("/") else path
+ return self._request("GET", url)
+
+ def post(self, path: str, body: dict) -> dict:
+ url = f"{self.base}{path}" if path.startswith("/") else path
+ return self._request("POST", url, body=body)
+
+ def search_issues(self, query: str) -> list:
+ # Rate-limit search API to avoid secondary limits
+ elapsed = time.time() - self._search_last_call
+ if elapsed < 2:
+ time.sleep(2 - elapsed)
+ self._search_last_call = time.time()
+ encoded = quote(query)
+ resp = self.get(f"/search/issues?q={encoded}&per_page=30")
+ return resp.get("items", []) if isinstance(resp, dict) else []
+
+ def download_log(self, job_id: int) -> Optional[str]:
+ """Download job log. Returns log text or None if unavailable.
+
+ The GitHub API responds with a 302 redirect to a storage URL.
+ We must NOT forward the Authorization header to the redirect target,
+ so we handle the redirect manually.
+ """
+ url =
f"{self.base}/repos/{REPO_OWNER}/{REPO_NAME}/actions/jobs/{job_id}/logs"
+ headers = {
+ "Authorization": f"token {self.token}",
+ "Accept": "application/vnd.github+json",
+ "X-GitHub-Api-Version": "2022-11-28",
+ }
+ try:
+ # Build an opener that does NOT auto-follow redirects
+ class NoRedirect(urllib_request.HTTPRedirectHandler):
+ def redirect_request(self, req, fp, code, msg, headers,
newurl):
+ return None # stop auto-redirect
+
+ opener = urllib_request.build_opener(NoRedirect)
+ req = Request(url, headers=headers, method="GET")
+ try:
+ with opener.open(req, timeout=120) as resp:
+ return resp.read().decode(errors="replace")
+ except HTTPError as e:
+ if e.code in (301, 302, 303, 307, 308):
+ redirect_url = e.headers.get("Location")
+ if redirect_url:
+ # Follow redirect WITHOUT auth header
+ req2 = Request(redirect_url, method="GET")
+ with urlopen(req2, timeout=120) as resp2:
+ return resp2.read().decode(errors="replace")
+ elif e.code == 404:
+ log.warning("Log for job %d not available (404)", job_id)
+ return None
+ raise
+ except HTTPError as e:
+ if e.code == 404:
+ log.warning("Log for job %d not available (404)", job_id)
+ return None
+ raise
+ except URLError as e:
+ log.warning("Failed to download log for job %d: %s", job_id, e)
+ return None
+
+
+# ---------------------------------------------------------------------------
+# Fetching workflow runs and jobs
+# ---------------------------------------------------------------------------
+
+
+def get_failed_runs(
+ api: GitHubAPI, workflow_file: str, since: datetime
+) -> List[dict]:
+ """Get failed workflow runs on main branch since the given time."""
+ path = (
+
f"/repos/{REPO_OWNER}/{REPO_NAME}/actions/workflows/{workflow_file}/runs"
+ f"?branch=main&status=failure&per_page=100"
+ )
+ resp = api.get(path)
+ runs = resp.get("workflow_runs", []) if isinstance(resp, dict) else []
+
+ filtered = []
+ for run in runs:
+ created = datetime.fromisoformat(run["created_at"].replace("Z",
"+00:00"))
+ if created >= since:
+ filtered.append(run)
+ log.info(
+ "Workflow %s: found %d failed runs in lookback window",
+ workflow_file,
+ len(filtered),
+ )
+ return filtered
+
+
+def get_failed_jobs(api: GitHubAPI, run_id: int) -> List[dict]:
+ """Get failed jobs for a given workflow run."""
+ path =
f"/repos/{REPO_OWNER}/{REPO_NAME}/actions/runs/{run_id}/jobs?per_page=100"
+ resp = api.get(path)
+ jobs = resp.get("jobs", []) if isinstance(resp, dict) else []
+ return [j for j in jobs if j.get("conclusion") == "failure"]
+
+
+# ---------------------------------------------------------------------------
+# Log parsing
+# ---------------------------------------------------------------------------
+
+
+def strip_timestamp(line: str) -> str:
+ m = TIMESTAMP_RE.match(line)
+ return m.group(1) if m else line
+
+
+def strip_ansi(text: str) -> str:
+ return ANSI_RE.sub("", text)
+
+
+def parse_test_failures(
+ log_text: str, job_url: str, run_id: int, job_id: int
+) -> List[TestFailure]:
+ """Parse Maven surefire test failures from a GitHub Actions job log."""
+ lines = strip_ansi(log_text).splitlines()
+ lines = [strip_timestamp(l) for l in lines]
+
+ failures: List[TestFailure] = []
+ i = 0
+ while i < len(lines):
+ line = lines[i]
+ m = TEST_FAILURE_RE.match(line)
+ if not m:
+ i += 1
+ continue
+
+ fq_class = m.group(1) # fully qualified class (may include method
prefix parts)
+ method = m.group(2)
+
+ # fq_class is everything before the last .method, e.g.:
+ # "org.apache.fluss.lake.paimon.flink.FlinkUnionReadLogTableITCase"
+ simple_class = fq_class.rsplit(".", 1)[-1] if "." in fq_class else
fq_class
+
+ # Collect the error header line
+ error_lines = [line]
+ i += 1
+
+ # Collect stack trace
+ while i < len(lines):
+ sline = lines[i]
+ # Stack trace lines: exception names, "at" frames, "Caused by:",
"... N more", "Suppressed:", or indented lines
+ if (
+ STACK_LINE_RE.match(sline)
+ or sline.startswith("\t")
+ or (sline and sline[0] == " " and len(sline) > 1 and
sline.strip().startswith("at "))
+ ):
+ error_lines.append(sline)
+ i += 1
+ else:
+ break
+
+ # Truncate if too long
+ if len(error_lines) > MAX_STACK_TRACE_LINES:
+ error_lines = error_lines[:MAX_STACK_TRACE_LINES]
+ error_lines.append("... (truncated)")
+
+ failures.append(
+ TestFailure(
+ class_name=fq_class,
+ method_name=method,
+ simple_class_name=simple_class,
+ error_message="\n".join(error_lines),
+ job_url=job_url,
+ run_id=run_id,
+ job_id=job_id,
+ )
+ )
+
+ return failures
+
+
+# ---------------------------------------------------------------------------
+# Issue management
+# ---------------------------------------------------------------------------
+
+
+def build_issue_title(simple_class: str, method: str) -> str:
+ return f"{ISSUE_TITLE_PREFIX}{simple_class}.{method}"
+
+
+def build_issue_body(job_url: str, error_message: str) -> str:
+ return f"{job_url}\n\n```\n{error_message}\n```\n"
+
+
+def find_existing_issue(
+ api: GitHubAPI, simple_class: str, method: str
+) -> Optional[int]:
+ """Search for an existing open issue for this test failure. Returns issue
number or None."""
+ title = build_issue_title(simple_class, method)
+ query = (
+ f"repo:{REPO_OWNER}/{REPO_NAME} is:issue is:open "
+ f'in:title "{ISSUE_TITLE_PREFIX}{simple_class}.{method}"'
+ )
+ items = api.search_issues(query)
+ for item in items:
+ if item.get("title", "").strip() == title:
+ return item["number"]
+ return None
+
+
+def is_already_commented(
+ api: GitHubAPI, issue_number: int, job_url: str
+) -> bool:
+ """Check if a comment with this job_url already exists on the issue."""
+ path = (
+ f"/repos/{REPO_OWNER}/{REPO_NAME}/issues/{issue_number}/comments"
+ f"?per_page=100&sort=created&direction=desc"
+ )
+ try:
+ comments = api.get(path)
+ if not isinstance(comments, list):
+ return False
+ for c in comments:
+ if job_url in c.get("body", ""):
+ return True
+ except Exception as e:
+ log.warning("Failed to check comments on issue #%d: %s", issue_number,
e)
+ return False
+
+
+def is_already_in_issue_body(
+ api: GitHubAPI, issue_number: int, job_url: str
+) -> bool:
+ """Check if the job_url already exists in the issue body."""
+ path = f"/repos/{REPO_OWNER}/{REPO_NAME}/issues/{issue_number}"
+ try:
+ issue = api.get(path)
+ if isinstance(issue, dict):
+ return job_url in issue.get("body", "")
+ except Exception as e:
+ log.warning("Failed to fetch issue #%d: %s", issue_number, e)
+ return False
+
+
+def create_issue(
+ api: GitHubAPI, title: str, body: str, dry_run: bool = False
+) -> Optional[int]:
+ if dry_run:
+ log.info("[DRY RUN] Would create issue: %s", title)
+ return None
+ try:
+ resp = api.post(
+ f"/repos/{REPO_OWNER}/{REPO_NAME}/issues",
+ {"title": title, "body": body},
+ )
+ number = resp.get("number")
+ log.info("Created issue #%s: %s", number, title)
+ return number
+ except HTTPError as e:
+ log.error("Failed to create issue '%s': HTTP %d", title, e.code)
+ return None
+
+
+def add_comment(
+ api: GitHubAPI, issue_number: int, body: str, dry_run: bool = False
+) -> None:
+ if dry_run:
+ log.info("[DRY RUN] Would comment on issue #%d", issue_number)
+ return
+ try:
+ api.post(
+ f"/repos/{REPO_OWNER}/{REPO_NAME}/issues/{issue_number}/comments",
+ {"body": body},
+ )
+ log.info("Added comment to issue #%d", issue_number)
+ except HTTPError as e:
+ log.error("Failed to comment on issue #%d: HTTP %d", issue_number,
e.code)
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+
+
+def main():
+ token = os.environ.get("GITHUB_TOKEN")
+ if not token:
+ log.error("GITHUB_TOKEN environment variable is required")
+ sys.exit(1)
+
+ lookback_hours = int(os.environ.get("LOOKBACK_HOURS",
str(DEFAULT_LOOKBACK_HOURS)))
+ dry_run = os.environ.get("DRY_RUN", "false").lower() in ("true", "1",
"yes")
+
+ if dry_run:
+ log.info("Running in DRY RUN mode - no issues will be created or
commented on")
+
+ api = GitHubAPI(token)
+ since = datetime.now(timezone.utc) - timedelta(hours=lookback_hours)
+ log.info("Looking back %d hours (since %s)", lookback_hours,
since.isoformat())
+
+ # Track stats
+ total_runs = 0
+ total_failures = 0
+ issues_created = 0
+ comments_added = 0
+
+ # Dedup within this execution
+ seen: set = set()
+
+ # Phase 1: Collect all actions (create issue or add comment)
+ # Each entry: ("create", title, body) or ("comment", issue_number, body)
+ actions: list = []
+
+ for wf_file in WORKFLOW_FILES:
+ try:
+ runs = get_failed_runs(api, wf_file, since)
+ except Exception as e:
+ log.error("Failed to fetch runs for %s: %s", wf_file, e)
+ continue
+
+ for run in runs:
+ total_runs += 1
+ run_id = run["id"]
+ log.info(
+ "Processing run #%d (workflow=%s, created=%s)",
+ run_id,
+ wf_file,
+ run["created_at"],
+ )
+
+ try:
+ jobs = get_failed_jobs(api, run_id)
+ except Exception as e:
+ log.error("Failed to fetch jobs for run %d: %s", run_id, e)
+ continue
+
+ for job in jobs:
+ job_id = job["id"]
+ job_url = job.get("html_url", "")
+ job_name = job.get("name", "")
+ log.info(" Processing job: %s (id=%d)", job_name, job_id)
+
+ log_text = api.download_log(job_id)
+ if not log_text:
+ log.warning(" No log available for job %d, skipping",
job_id)
+ continue
+
+ failures = parse_test_failures(log_text, job_url, run_id,
job_id)
+ if not failures:
+ log.info(" No test failures found in job log")
+ continue
+
+ log.info(" Found %d test failure(s)", len(failures))
+
+ for failure in failures:
+ key = (failure.simple_class_name, failure.method_name)
+ if key in seen:
+ log.info(
+ " Skipping duplicate: %s.%s",
+ failure.simple_class_name,
+ failure.method_name,
+ )
+ continue
+ seen.add(key)
+ total_failures += 1
+
+ title = build_issue_title(
+ failure.simple_class_name, failure.method_name
+ )
+ body = build_issue_body(failure.job_url,
failure.error_message)
+
+ try:
+ existing = find_existing_issue(
+ api, failure.simple_class_name, failure.method_name
+ )
+ except Exception as e:
+ log.error("Failed to search issues for %s: %s", title,
e)
+ continue
+
+ if existing:
+ # Check if already commented with this job URL
+ if is_already_commented(
+ api, existing, failure.job_url
+ ) or is_already_in_issue_body(api, existing,
failure.job_url):
+ log.info(
+ " Already reported on issue #%d for %s",
+ existing,
+ failure.job_url,
+ )
+ continue
+ actions.append(("comment", existing, body))
+ else:
+ actions.append(("create", title, body))
+
+ # Phase 2: Rate limit check and execute actions
+ new_issue_count = sum(1 for a in actions if a[0] == "create")
+ max_new_issues = int(os.environ.get("MAX_NEW_ISSUES", str(MAX_NEW_ISSUES)))
+
+ if new_issue_count > max_new_issues:
+ log.error(
+ "Rate limit exceeded: would create %d new issues (max %d). "
+ "Skipping all issue creation and commenting. "
+ "The issues that would have been created:",
+ new_issue_count,
+ max_new_issues,
+ )
+ for action in actions:
+ if action[0] == "create":
+ log.error(" - %s", action[1])
+ else:
+ log.error(" - Comment on issue #%d", action[1])
+ log.error(
+ "This likely indicates a systemic CI problem rather than "
+ "individual flaky tests. Please investigate manually."
+ )
+ sys.exit(1)
+
+ for action in actions:
+ if action[0] == "create":
+ create_issue(api, action[1], action[2], dry_run)
+ issues_created += 1
+ else:
+ add_comment(api, action[1], action[2], dry_run)
+ comments_added += 1
+
+ log.info(
+ "Done. Processed %d run(s), found %d unique failure(s), "
+ "created %d issue(s), added %d comment(s).",
+ total_runs,
+ total_failures,
+ issues_created,
+ comments_added,
+ )
+
+
+if __name__ == "__main__":
+ main()