This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new ece82775675 [v3-2-test] Improve DB performance of datetime range 
filters filters in API queries (#66696) (#67102)
ece82775675 is described below

commit ece82775675cad38722717a802cfffaefd2187f0
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon May 18 13:04:00 2026 +0200

    [v3-2-test] Improve DB performance of datetime range filters filters in API 
queries (#66696) (#67102)
    
    * fix(api): replace COALESCE with index-friendly OR conditions in datetime 
range filters
    
    Adds NullableDatetimeRangeFilter, a RangeFilter subclass for 
start_date/end_date
    columns that emits OR predicates instead of COALESCE(col, now()), allowing
    PostgreSQL to use btree indexes on those columns.
    
    Two bugs fixed versus the original implementation in PR #66696:
    - Lower bounds now use or_(col >= x, col.is_(None)) without a now() guard,
      so future-scheduled tasks (NULL start_date) are never incorrectly 
excluded.
    - The factory dispatches on (attribute_name or filter_name), so aliased 
callers
      like datetime_range_filter_factory("dag_run_end_date", DagRun, "end_date")
      also receive NullableDatetimeRangeFilter rather than a plain RangeFilter.
    
    * fix(api): scope NullableDatetimeRangeFilter to filter_name, not 
attribute_name
    
    datetime_range_filter_factory("dag_run_start_date", DagRun, "start_date")
    passes attribute_name="start_date", so the guard
    
        if (attribute_name or filter_name) in ("start_date", "end_date"):
    
    resolved to "start_date" and incorrectly returned 
NullableDatetimeRangeFilter
    for the dag_run_start/end_date filters in the DAGs route. Those columns are
    reached via an outer join; NULL means "no run", not "currently running", so
    the OR (col IS NULL) branch inflated total_entries counts.
    
    The original COALESCE guard checked filter_name only, so 
"dag_run_start_date"
    was excluded. Revert to filter_name to preserve those semantics — only
    callers with filter_name="start_date" or "end_date" (task instances, 
dag_run,
    job routes) get NullableDatetimeRangeFilter.
    
    Fixes TestGetDags::test_get_dags failures for query_params 13/14/17/21/23.
    
    * fix(tests): correct test_aliased_*_returns_nullable_filter assertions
    
    The dag_run_start_date and dag_run_end_date filters in the DAGs route use
    an outer join, so NULL means "the DAG has no runs" — not "currently 
running".
    They must return a plain RangeFilter, not NullableDatetimeRangeFilter.
    
    Replace the two tests that incorrectly expected NullableDatetimeRangeFilter
    for aliased callers with tests that assert plain RangeFilter is returned.
    (cherry picked from commit 37667f11aa37eb27072a79b2de1d5dbec09c2218)
    
    Co-authored-by: Hemkumar Chheda <[email protected]>
---
 .../src/airflow/api_fastapi/common/parameters.py   | 56 +++++++++++++++----
 .../unit/api_fastapi/common/test_parameters.py     | 62 ++++++++++++++++++++++
 2 files changed, 108 insertions(+), 10 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py 
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index a93ec040e1c..38134d825e7 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -898,6 +898,44 @@ class RangeFilter(BaseParam[Range]):
         )
 
 
+class NullableDatetimeRangeFilter(RangeFilter):
+    """
+    RangeFilter for nullable datetime columns (``start_date``, ``end_date``), 
rewritten for index use.
+
+    ``COALESCE(column, now())`` wraps the column in a function call that 
prevents PostgreSQL from
+    using btree indexes, forcing sequential scans on large tables. This class 
emits equivalent
+    ``OR`` predicates so each branch can be satisfied by an independent index 
scan.
+
+    NULL semantics: ``start_date=NULL`` means the task has not started yet; 
``end_date=NULL`` means
+    the task is still running. For lower bounds the NULL branch passes 
unconditionally — a not-yet-
+    started/ended task will eventually satisfy any past lower bound. For upper 
bounds the NULL branch
+    is ``col IS NULL AND now() <= x``, preserving the COALESCE(col, now()) 
semantics without the
+    function-wrap index penalty.
+    """
+
+    def to_orm(self, select: Select) -> Select:
+        if self.skip_none is False:
+            raise ValueError(f"Cannot set 'skip_none' to False on a 
{type(self)}")
+
+        if self.value is None:
+            return select
+
+        if self.value.lower_bound_gte:
+            x = self.value.lower_bound_gte
+            select = select.where(or_(self.attribute >= x, 
self.attribute.is_(None)))
+        if self.value.lower_bound_gt:
+            x = self.value.lower_bound_gt
+            select = select.where(or_(self.attribute > x, 
self.attribute.is_(None)))
+        if self.value.upper_bound_lte:
+            x = self.value.upper_bound_lte
+            select = select.where(or_(self.attribute <= x, 
and_(self.attribute.is_(None), func.now() <= x)))
+        if self.value.upper_bound_lt:
+            x = self.value.upper_bound_lt
+            select = select.where(or_(self.attribute < x, 
and_(self.attribute.is_(None), func.now() < x)))
+
+        return select
+
+
 def datetime_range_filter_factory(
     filter_name: str, model: Base, attribute_name: str | None = None
 ) -> Callable[[datetime | None, datetime | None, datetime | None, datetime | 
None], RangeFilter]:
@@ -908,17 +946,15 @@ def datetime_range_filter_factory(
         upper_bound_lt: datetime | None = Query(alias=f"{filter_name}_lt", 
default=None),
     ) -> RangeFilter:
         attr = getattr(model, attribute_name or filter_name)
-        if filter_name in ("start_date", "end_date"):
-            attr = func.coalesce(attr, func.now())
-        return RangeFilter(
-            Range(
-                lower_bound_gte=lower_bound_gte,
-                lower_bound_gt=lower_bound_gt,
-                upper_bound_lte=upper_bound_lte,
-                upper_bound_lt=upper_bound_lt,
-            ),
-            attr,
+        range_val = Range(
+            lower_bound_gte=lower_bound_gte,
+            lower_bound_gt=lower_bound_gt,
+            upper_bound_lte=upper_bound_lte,
+            upper_bound_lt=upper_bound_lt,
         )
+        if filter_name in ("start_date", "end_date"):
+            return NullableDatetimeRangeFilter(range_val, attr)
+        return RangeFilter(range_val, attr)
 
     return depends_datetime
 
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py 
b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
index 3c78a09ac61..6d5014b4cec 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
@@ -18,6 +18,7 @@
 from __future__ import annotations
 
 import re
+from datetime import datetime, timezone
 from types import SimpleNamespace
 from typing import Annotated
 
@@ -27,11 +28,14 @@ from sqlalchemy import select
 
 from airflow.api_fastapi.common.parameters import (
     FilterParam,
+    NullableDatetimeRangeFilter,
+    RangeFilter,
     SortParam,
     _PrefixPatternParam,
     _PrefixSearchParam,
     _SearchParam,
     _TaskDisplayNamePrefixPatternParam,
+    datetime_range_filter_factory,
     filter_param_factory,
 )
 from airflow.models import DagModel, DagRun, Log
@@ -335,3 +339,61 @@ class TestTaskDisplayNamePrefixPatternParam:
 
         sql = _compile(statement)
         assert "true" in sql or "1 = 1" in sql
+
+
+def _make_datetime_filter(filter_name, model=TaskInstance, 
attribute_name=None, **kwargs):
+    """Call datetime_range_filter_factory outside FastAPI by supplying None 
for all omitted bounds."""
+    defaults = dict(lower_bound_gte=None, lower_bound_gt=None, 
upper_bound_lte=None, upper_bound_lt=None)
+    defaults.update(kwargs)
+    return datetime_range_filter_factory(filter_name, model, 
attribute_name)(**defaults)
+
+
+class TestDatetimeRangeFilterFactory:
+    """datetime_range_filter_factory dispatches to NullableDatetimeRangeFilter 
for start/end dates."""
+
+    def test_start_date_returns_nullable_filter(self):
+        rf = _make_datetime_filter("start_date")
+        assert isinstance(rf, NullableDatetimeRangeFilter)
+
+    def test_end_date_returns_nullable_filter(self):
+        rf = _make_datetime_filter("end_date")
+        assert isinstance(rf, NullableDatetimeRangeFilter)
+
+    def test_aliased_filter_name_returns_plain_filter(self):
+        """dag_run_start_date uses attribute_name='start_date' via outer join; 
NULL means 'no run',
+        not 'currently running', so it must return a plain RangeFilter to 
avoid inflating counts."""
+        rf = _make_datetime_filter("dag_run_start_date", model=DagRun, 
attribute_name="start_date")
+        assert type(rf) is RangeFilter
+
+    def test_aliased_end_date_returns_plain_filter(self):
+        """dag_run_end_date uses attribute_name='end_date' via outer join; 
must return plain RangeFilter."""
+        rf = _make_datetime_filter("dag_run_end_date", model=DagRun, 
attribute_name="end_date")
+        assert type(rf) is RangeFilter
+
+    def test_other_column_returns_plain_filter(self):
+        rf = _make_datetime_filter("queued_dttm")
+        assert type(rf) is RangeFilter
+
+    def test_lower_bound_does_not_include_now(self):
+        """NULL branch on lower bounds passes unconditionally — no now() 
call."""
+        bound = datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
+        rf = _make_datetime_filter("start_date", lower_bound_gte=bound)
+        sql = _compile(rf.to_orm(select(TaskInstance)))
+        assert "is null" in sql
+        assert "now()" not in sql
+        assert "coalesce" not in sql
+
+    def test_upper_bound_includes_now_for_running_tasks(self):
+        """NULL branch on upper bounds uses now() to proxy the in-progress 
task's current time."""
+        bound = datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
+        rf = _make_datetime_filter("end_date", upper_bound_lte=bound)
+        sql = _compile(rf.to_orm(select(TaskInstance)))
+        assert "is null" in sql
+        assert "now()" in sql
+        assert "coalesce" not in sql
+
+    def test_no_coalesce_for_start_date(self):
+        bound = datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc)
+        rf = _make_datetime_filter("start_date", upper_bound_lte=bound)
+        sql = _compile(rf.to_orm(select(TaskInstance)))
+        assert "coalesce" not in sql

Reply via email to