This is an automated email from the ASF dual-hosted git repository.

pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 34901b68ae8 Fix cursor encoding for column-form SortParam to_replace 
(#67973)
34901b68ae8 is described below

commit 34901b68ae88d2a2c1888077b1ac11ac727724cd
Author: GayathriSrividya <[email protected]>
AuthorDate: Thu Jun 11 14:02:41 2026 +0530

    Fix cursor encoding for column-form SortParam to_replace (#67973)
    
    * Fix cursor encoding for column-form SortParam to_replace
    
    * Fix: move uuid7 import to module level, use top-level update import
    
    ---------
    
    Co-authored-by: Gayathri Srividya Rajavarapu 
<[email protected]>
---
 .../src/airflow/api_fastapi/common/parameters.py   | 29 ++++----
 .../tests/unit/api_fastapi/common/test_cursors.py  | 36 ++++++++++
 .../unit/api_fastapi/common/test_parameters.py     | 24 +++++--
 .../core_api/routes/public/test_task_instances.py  | 78 ++++++++++------------
 4 files changed, 108 insertions(+), 59 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py 
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index d18f8dc7ff5..73c17551f2f 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -639,24 +639,29 @@ class SortParam(BaseParam[list[str]]):
         Extract the sort-key value for ``name`` from a result row.
 
         Resolves the accessor through ``to_replace`` for string aliases
-        (e.g. ``{"dag_run_id": "run_id"}``); otherwise reads ``name`` directly.
+        (e.g. ``{"dag_run_id": "run_id"}``). For column-form mappings
+        (e.g. ``{"run_after": DagRun.run_after}``), resolves through the
+        primary model's attribute so association proxies can still be used
+        for cursor values. Raises ``NotImplementedError`` when the model
+        exposes no such attribute rather than emitting a ``None`` cursor token.
         """
         if self.to_replace:
             replacement = self.to_replace.get(name)
             if isinstance(replacement, str):
                 return getattr(row, replacement, None)
             if replacement is not None and not isinstance(replacement, list):
-                # TODO: Column-form ``to_replace`` (e.g. ``{"last_run_state": 
DagRun.state}``)
-                # isn't supported for cursor pagination — no endpoint that 
uses cursor
-                # pagination needs it today. When one does, decide how the row 
exposes the
-                # value (projected label on the SELECT, eagerly loaded 
relationship, etc.)
-                # and wire it up here. Raising loudly so a future caller 
doesn't silently
-                # get ``None`` cursor tokens.
-                raise NotImplementedError(
-                    f"Cursor pagination does not support column-form 
``to_replace`` mapping for "
-                    f"``{name}``. Use a string alias in ``to_replace`` or sort 
by a primary-model "
-                    f"attribute."
-                )
+                # Column-form mapping resolves through the primary model's 
attribute,
+                # often an association proxy onto the joined entity
+                # (``TaskInstance.run_after`` -> ``dag_run.run_after``). Fail 
loudly if the
+                # model exposes no such attribute, rather than emitting a 
``None`` cursor token.
+                try:
+                    return getattr(row, name)
+                except AttributeError:
+                    raise NotImplementedError(
+                        f"Cursor pagination cannot resolve column-form 
``to_replace`` for "
+                        f"``{name}``: the primary model exposes no such 
attribute. Add an "
+                        f"association proxy, use a string alias, or sort by a 
primary-model column."
+                    )
             # List-form replacements are expanded in _resolve() into 
individual entries
             # each using the column's own ORM key as attr_name, so ``name`` at 
this point
             # is already a concrete model attribute (e.g. 
``_rendered_map_index`` or
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_cursors.py 
b/airflow-core/tests/unit/api_fastapi/common/test_cursors.py
index 11db6ca5ba8..8bf8adbba36 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_cursors.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_cursors.py
@@ -20,6 +20,7 @@ from __future__ import annotations
 import base64
 import uuid
 from datetime import datetime, timezone
+from types import SimpleNamespace
 from unittest.mock import MagicMock
 
 import msgspec
@@ -29,6 +30,7 @@ from sqlalchemy import select
 
 from airflow.api_fastapi.common.cursors import apply_cursor_filter, 
decode_cursor, encode_cursor
 from airflow.api_fastapi.common.parameters import SortParam
+from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
 
 
@@ -85,6 +87,40 @@ class TestCursorPagination:
         decoded = decode_cursor(token)
         assert decoded == ["019462ab-1234-5678-9abc-def012345678"]
 
+    def 
test_encode_cursor_with_column_form_to_replace_falls_back_to_row_attr(self):
+        """Column-form ``to_replace`` should still allow cursor encoding via 
row attribute access."""
+        sp = SortParam(["id", "run_after"], TaskInstance, {"run_after": 
DagRun.run_after})
+        sp.set_value(["run_after"])
+
+        row = SimpleNamespace(
+            run_after="2026-06-04T10:00:00+00:00",
+            id="019462ab-1234-5678-9abc-def012345678",
+        )
+
+        token = encode_cursor(row, sp)
+        decoded = decode_cursor(token)
+        assert decoded == [
+            "2026-06-04T10:00:00+00:00",
+            "019462ab-1234-5678-9abc-def012345678",
+        ]
+
+    def 
test_encode_cursor_column_form_to_replace_raises_when_attribute_absent(self):
+        """
+        ``encode_cursor`` must raise ``NotImplementedError`` (not silently 
encode ``None``)
+        when a column-form ``to_replace`` key has no corresponding attribute 
on the row object.
+        A ``None`` token would cause the next-page WHERE to compare against 
NULL and drop rows.
+        """
+        sp = SortParam(
+            ["id", "data_interval_start"], TaskInstance, 
{"data_interval_start": DagRun.data_interval_start}
+        )
+        sp.set_value(["data_interval_start"])
+
+        # Row without data_interval_start — TaskInstance does not expose this 
as an attribute.
+        row = SimpleNamespace(id="019462ab-1234-5678-9abc-def012345678")
+
+        with pytest.raises(NotImplementedError, match="data_interval_start"):
+            encode_cursor(row, sp)
+
     def test_apply_cursor_filter_wrong_value_count(self):
         sp = self._make_sort_param_with_resolved_columns(["start_date"])
         token = _msgpack_cursor_token(["only-one-value"])
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py 
b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
index e2742d88a94..ecf20a47c6f 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
@@ -135,17 +135,29 @@ class TestSortParam:
         assert param.row_value(row, "dag_run_id") == "manual__2026-04-22"
         assert param.row_value(row, "id") == 42
 
-    def test_row_value_raises_on_column_form_to_replace(self):
+    def test_row_value_column_form_to_replace_resolves_via_row_attribute(self):
         """
-        Column-form ``to_replace`` is not supported by cursor encoding. The 
helper must
-        fail loudly so a future endpoint doesn't silently ship ``None`` cursor 
tokens.
+        Column-form ``to_replace`` resolves through the primary model's 
attribute so
+        association proxies (e.g. ``TaskInstance.run_after``) are usable for 
cursor encoding.
         """
         param = SortParam(["dag_id"], DagModel, {"last_run_state": 
DagRun.state}).set_value(
             ["last_run_state"]
         )
-        row = SimpleNamespace(id="test_dag")
-        with pytest.raises(NotImplementedError, match="column-form 
``to_replace``"):
-            param.row_value(row, "last_run_state")
+        row = SimpleNamespace(id="test_dag", last_run_state="success")
+        assert param.row_value(row, "last_run_state") == "success"
+
+    def 
test_row_value_column_form_to_replace_raises_when_attribute_absent(self):
+        """
+        Column-form ``to_replace`` must raise ``NotImplementedError`` (not 
return ``None``)
+        when the primary model exposes no such attribute. A ``None`` cursor 
token would cause
+        the next-page ``WHERE`` to compare against ``NULL`` and silently drop 
rows.
+        """
+        param = SortParam(
+            ["dag_id"], DagModel, {"data_interval_start": 
DagRun.data_interval_start}
+        ).set_value(["data_interval_start"])
+        row = SimpleNamespace(id="test_dag")  # deliberately no 
data_interval_start attribute
+        with pytest.raises(NotImplementedError, match="data_interval_start"):
+            param.row_value(row, "data_interval_start")
 
     def test_primary_key_is_not_duplicated_when_alias_maps_to_pk(self):
         """Sorting by an alias that resolves to the PK must not append the PK 
a second time."""
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 6edae03cb09..3627ba62d68 100644
--- 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++ 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -43,12 +43,12 @@ from airflow.models.dag_version import DagVersion
 from airflow.models.dagbundle import DagBundleModel
 from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
 from airflow.models.task_store import TaskStoreModel
+from airflow.models.taskinstance import uuid7
 from airflow.models.taskinstancehistory import TaskInstanceHistory
 from airflow.models.taskmap import TaskMap
 from airflow.models.team import Team
 from airflow.models.trigger import Trigger
-from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.sdk import BaseOperator, TaskGroup
+from airflow.sdk import BaseOperator
 from airflow.state.metastore import MetastoreStoreBackend
 from airflow.utils.platform import getuser
 from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -979,10 +979,6 @@ class TestGetMappedTaskInstances:
         This verifies that even when UUIDs are assigned out of map_index order
         (as happens during retries), the response is still sorted 0, 1, 2, ...
         """
-        from sqlalchemy import update as sa_update
-
-        from airflow.models.taskinstance import uuid7
-
         self.create_dag_runs_with_mapped_tasks(
             dag_maker,
             session,
@@ -994,7 +990,7 @@ class TestGetMappedTaskInstances:
         # result must still follow integer map_index order, not UUID order.
         for map_index in [1, 3]:
             session.execute(
-                sa_update(TaskInstance)
+                update(TaskInstance)
                 .where(
                     TaskInstance.dag_id == "retry_dag",
                     TaskInstance.task_id == "task_2",
@@ -2117,46 +2113,46 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
         )
         assert response.status_code == 400
 
-    def test_task_group_filter_uses_run_version_not_latest(self, test_client, 
dag_maker, session):
+    def test_cursor_pagination_order_by_run_after_roundtrips(self, 
test_client, session):
         """
-        Task group lookup should use the DAG version from the run, not the 
latest version.
+        Sorting by ``run_after`` (a column-form ``to_replace`` backed by an 
association proxy)
+        must not raise a 500 when ``has_next=true``.  Regression for
+        https://github.com/apache/airflow/issues/67970.
 
-        When a task group is renamed between versions, clicking on a 
historical run's
-        task group in the grid should still resolve correctly against the 
version
-        that run was created with — not the latest version where the group may 
have
-        a different name, i.e serialized_dag might not have that taskgroup 
anymore.
+        Verify the full cursor round-trip: the first page must include a 
``next_cursor``,
+        and following that cursor must return the remaining TIs without 
overlap.
         """
-        dag_id = "test_tg_version"
-
-        # Version 1: task group named "process_data"
-        with dag_maker(dag_id, session=session):
-            with TaskGroup(group_id="process_data"):
-                EmptyOperator(task_id="step_1")
-        dag_maker.create_dagrun(run_id="run_v1")
-        session.commit()
-
-        # Version 2: task group renamed to "process_data_v2"
-        with dag_maker(dag_id, session=session):
-            with TaskGroup(group_id="process_data_v2"):
-                EmptyOperator(task_id="step_1")
-        session.commit()
-
-        # The run was created with v1 which had "process_data".
-        # Querying with the old group name must succeed.
-        response = test_client.get(
-            f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
-            params={"task_group_id": "process_data"},
+        dag_id = "example_python_operator"
+        total_tis = 5
+        self.create_task_instances(
+            session,
+            task_instances=[
+                {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i + 
1))} for i in range(total_tis)
+            ],
+            dag_id=dag_id,
         )
-        assert response.status_code == 200, response.json()
-        assert response.json()["total_entries"] == 1
-        assert response.json()["task_instances"][0]["task_id"] == 
"process_data.step_1"
+        # First page — limit < total so next_cursor must be present
+        response1 = test_client.get(
+            "/dags/~/dagRuns/~/taskInstances",
+            params={"limit": 3, "order_by": ["-run_after"], "cursor": ""},
+        )
+        assert response1.status_code == 200, response1.json()
+        body1 = response1.json()
+        assert len(body1["task_instances"]) == 3
+        next_cursor = body1["next_cursor"]
+        assert next_cursor is not None, "next_cursor must be present when more 
rows exist"
 
-        # The new group name should NOT be found in the old run's version.
-        response = test_client.get(
-            f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
-            params={"task_group_id": "process_data_v2"},
+        # Second page — follow the cursor; must not 500 and must return 
remaining TIs
+        response2 = test_client.get(
+            "/dags/~/dagRuns/~/taskInstances",
+            params={"limit": 10, "order_by": ["-run_after"], "cursor": 
next_cursor},
         )
-        assert response.status_code == 404
+        assert response2.status_code == 200, response2.json()
+        body2 = response2.json()
+        ids1 = {ti["id"] for ti in body1["task_instances"]}
+        ids2 = {ti["id"] for ti in body2["task_instances"]}
+        assert ids1.isdisjoint(ids2), "Pages must not overlap"
+        assert len(ids1) + len(ids2) == total_tis
 
 
 class TestGetTaskDependencies(TestTaskInstanceEndpoint):

Reply via email to