This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 88e61106fdc Exclude non-successful runs from AVERAGE_RUNTIME deadline 
calculation (#68647)
88e61106fdc is described below

commit 88e61106fdc468ac7f2c922e077906fc7d3fa283
Author: Sean Ghaeli <[email protected]>
AuthorDate: Wed Jun 24 09:57:30 2026 -0700

    Exclude non-successful runs from AVERAGE_RUNTIME deadline calculation 
(#68647)
    
    DeadlineReference.AVERAGE_RUNTIME computes a deadline from the average 
duration of
    past DAG runs, but the query filtered only on dag_id + start/end-date 
present — with
    no DagRun.state filter. Failed runs (which may have died fast or hung 
before failing)
    were folded into the average, skewing the computed deadline: a fast-failing 
history
    makes it too short (spurious misses), a slow-then-failed history makes it 
too long
    (real slowness never trips it).
    
    Filter the duration query to successful runs only. Add tests asserting 
failed runs
    are excluded from the average and that the deadline is skipped when too few
    successful runs exist.
    
    ---------
    
    Co-authored-by: Sean Ghaeli <[email protected]>
    Co-authored-by: Ramit Kataria <[email protected]>
---
 airflow-core/docs/howto/deadline-alerts.rst        |  6 +-
 .../airflow/serialization/definitions/deadline.py  | 11 +++-
 airflow-core/tests/unit/models/test_deadline.py    | 65 ++++++++++++++++++++++
 3 files changed, 78 insertions(+), 4 deletions(-)

diff --git a/airflow-core/docs/howto/deadline-alerts.rst 
b/airflow-core/docs/howto/deadline-alerts.rst
index e36908009a0..8e729516fef 100644
--- a/airflow-core/docs/howto/deadline-alerts.rst
+++ b/airflow-core/docs/howto/deadline-alerts.rst
@@ -110,14 +110,14 @@ Airflow provides several built-in reference points that 
you can use with Deadlin
     Specifies a fixed point in time. Useful when Dags must complete by a 
specific time.
 
 ``DeadlineReference.AVERAGE_RUNTIME``
-    Calculates deadlines based on the average runtime of previous Dag runs. 
This reference
+    Calculates deadlines based on the average runtime of previous successful 
Dag runs. This reference
     analyzes historical execution data to predict when the current run should 
complete.
     The deadline is set to the current time plus the calculated average 
runtime plus the interval.
     If insufficient historical data exists, no deadline is created.
 
     Parameters:
-        * ``max_runs`` (int, optional): Maximum number of recent Dag runs to 
analyze. Defaults to 10.
-        * ``min_runs`` (int, optional): Minimum number of completed runs 
required to calculate average. Defaults to same value as ``max_runs``.
+        * ``max_runs`` (int, optional): Maximum number of successful recent 
Dag runs to analyze. Defaults to 10.
+        * ``min_runs`` (int, optional): Minimum number of successful recent 
Dag runs required to calculate average. Defaults to same value as ``max_runs``.
 
     Example usage:
 
diff --git a/airflow-core/src/airflow/serialization/definitions/deadline.py 
b/airflow-core/src/airflow/serialization/definitions/deadline.py
index 89e231cba24..20fac2b54e8 100644
--- a/airflow-core/src/airflow/serialization/definitions/deadline.py
+++ b/airflow-core/src/airflow/serialization/definitions/deadline.py
@@ -207,6 +207,7 @@ class SerializedReferenceModels:
             from sqlalchemy import func, select, text
 
             from airflow.models import DagRun
+            from airflow.utils.state import DagRunState
 
             dag_id = kwargs["dag_id"]
 
@@ -222,9 +223,17 @@ class SerializedReferenceModels:
             else:
                 raise ValueError(f"Unsupported database dialect: {dialect}")
 
+            # Only SUCCESSFUL runs represent a "normal" runtime. A run that 
failed fast or hung
+            # before failing would otherwise skew the average and produce a 
misleading deadline
+            # (too short -> spurious misses, or too long -> real slowness 
never trips it).
             query = (
                 select(duration_expr)
-                .filter(DagRun.dag_id == dag_id, 
DagRun.start_date.isnot(None), DagRun.end_date.isnot(None))
+                .filter(
+                    DagRun.dag_id == dag_id,
+                    DagRun.state == DagRunState.SUCCESS,
+                    DagRun.start_date.isnot(None),
+                    DagRun.end_date.isnot(None),
+                )
                 .order_by(DagRun.logical_date.desc())
                 .limit(self.max_runs)
             )
diff --git a/airflow-core/tests/unit/models/test_deadline.py 
b/airflow-core/tests/unit/models/test_deadline.py
index 3635bcadcbe..4fc1aada1cc 100644
--- a/airflow-core/tests/unit/models/test_deadline.py
+++ b/airflow-core/tests/unit/models/test_deadline.py
@@ -519,6 +519,71 @@ class TestCalculatedDeadlineDatabaseCalls:
         with pytest.raises(ValueError, match="min_runs must be at least 1"):
             DeadlineReference.AVERAGE_RUNTIME(max_runs=10, min_runs=-1)
 
+    def test_average_runtime_excludes_non_successful_runs(self, session, 
dag_maker):
+        """Only SUCCESSFUL runs contribute to the average; FAILED runs must be 
ignored.
+
+        A failed run's duration is not representative of a normal runtime, so 
including it
+        would skew the computed deadline. Seed an equal mix of fast-successful 
and
+        slow-failed runs and assert the average reflects only the successful 
ones.
+        """
+        with dag_maker(DAG_ID):
+            EmptyOperator(task_id="test_task")
+
+        base_time = DEFAULT_DATE
+        success_duration = 60  # the only durations that should count
+        failed_duration = 36000  # 10h — would massively skew the average if 
(wrongly) counted
+
+        # Interleave 3 successful (60s) and 3 failed (36000s) runs.
+        specs = [
+            (DagRunState.SUCCESS, success_duration),
+            (DagRunState.FAILED, failed_duration),
+            (DagRunState.SUCCESS, success_duration),
+            (DagRunState.FAILED, failed_duration),
+            (DagRunState.SUCCESS, success_duration),
+            (DagRunState.FAILED, failed_duration),
+        ]
+        for i, (state, duration) in enumerate(specs):
+            logical_date = base_time + timedelta(days=i)
+            start_time = logical_date + timedelta(minutes=5)
+            dagrun = dag_maker.create_dagrun(logical_date=logical_date, 
run_id=f"mix_run_{i}", state=state)
+            dagrun.start_date = start_time
+            dagrun.end_date = start_time + timedelta(seconds=duration)
+
+        session.commit()
+
+        # min_runs=3 so the 3 successful runs alone satisfy the minimum.
+        reference = 
SerializedReferenceModels.AverageRuntimeDeadline(max_runs=10, min_runs=3)
+        interval = timedelta(hours=1)
+
+        with mock.patch("airflow._shared.timezones.timezone.utcnow") as 
mock_utcnow:
+            mock_utcnow.return_value = DEFAULT_DATE
+            result = reference.evaluate_with(session=session, 
interval=interval, dag_id=DAG_ID)
+
+        # Average must be over the 3 successful 60s runs only (not the 36000s 
failures).
+        expected = DEFAULT_DATE + timedelta(seconds=success_duration) + 
interval
+        assert result.replace(second=0, microsecond=0) == 
expected.replace(second=0, microsecond=0)
+
+    def test_average_runtime_skips_when_too_few_successful_runs(self, session, 
dag_maker):
+        """If only FAILED runs exist (fewer than min_runs successful), no 
deadline is created."""
+        with dag_maker(DAG_ID):
+            EmptyOperator(task_id="test_task")
+
+        base_time = DEFAULT_DATE
+        for i in range(5):
+            logical_date = base_time + timedelta(days=i)
+            start_time = logical_date + timedelta(minutes=5)
+            dagrun = dag_maker.create_dagrun(
+                logical_date=logical_date, run_id=f"failed_run_{i}", 
state=DagRunState.FAILED
+            )
+            dagrun.start_date = start_time
+            dagrun.end_date = start_time + timedelta(seconds=3600)
+
+        session.commit()
+
+        reference = 
SerializedReferenceModels.AverageRuntimeDeadline(max_runs=10, min_runs=3)
+        result = reference.evaluate_with(session=session, 
interval=timedelta(hours=1), dag_id=DAG_ID)
+        assert result is None
+
 
 class TestDeadlineReference:
     """DeadlineReference lives in definitions/deadlines.py but properly 
testing them requires DB access."""

Reply via email to