This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 34901b68ae8 Fix cursor encoding for column-form SortParam to_replace
(#67973)
34901b68ae8 is described below
commit 34901b68ae88d2a2c1888077b1ac11ac727724cd
Author: GayathriSrividya <[email protected]>
AuthorDate: Thu Jun 11 14:02:41 2026 +0530
Fix cursor encoding for column-form SortParam to_replace (#67973)
* Fix cursor encoding for column-form SortParam to_replace
* Fix: move uuid7 import to module level, use top-level update import
---------
Co-authored-by: Gayathri Srividya Rajavarapu
<[email protected]>
---
.../src/airflow/api_fastapi/common/parameters.py | 29 ++++----
.../tests/unit/api_fastapi/common/test_cursors.py | 36 ++++++++++
.../unit/api_fastapi/common/test_parameters.py | 24 +++++--
.../core_api/routes/public/test_task_instances.py | 78 ++++++++++------------
4 files changed, 108 insertions(+), 59 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py
b/airflow-core/src/airflow/api_fastapi/common/parameters.py
index d18f8dc7ff5..73c17551f2f 100644
--- a/airflow-core/src/airflow/api_fastapi/common/parameters.py
+++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py
@@ -639,24 +639,29 @@ class SortParam(BaseParam[list[str]]):
Extract the sort-key value for ``name`` from a result row.
Resolves the accessor through ``to_replace`` for string aliases
- (e.g. ``{"dag_run_id": "run_id"}``); otherwise reads ``name`` directly.
+ (e.g. ``{"dag_run_id": "run_id"}``). For column-form mappings
+ (e.g. ``{"run_after": DagRun.run_after}``), resolves through the
+ primary model's attribute so association proxies can still be used
+ for cursor values. Raises ``NotImplementedError`` when the model
+ exposes no such attribute rather than emitting a ``None`` cursor token.
"""
if self.to_replace:
replacement = self.to_replace.get(name)
if isinstance(replacement, str):
return getattr(row, replacement, None)
if replacement is not None and not isinstance(replacement, list):
- # TODO: Column-form ``to_replace`` (e.g. ``{"last_run_state":
DagRun.state}``)
- # isn't supported for cursor pagination — no endpoint that
uses cursor
- # pagination needs it today. When one does, decide how the row
exposes the
- # value (projected label on the SELECT, eagerly loaded
relationship, etc.)
- # and wire it up here. Raising loudly so a future caller
doesn't silently
- # get ``None`` cursor tokens.
- raise NotImplementedError(
- f"Cursor pagination does not support column-form
``to_replace`` mapping for "
- f"``{name}``. Use a string alias in ``to_replace`` or sort
by a primary-model "
- f"attribute."
- )
+ # Column-form mapping resolves through the primary model's
attribute,
+ # often an association proxy onto the joined entity
+ # (``TaskInstance.run_after`` -> ``dag_run.run_after``). Fail
loudly if the
+ # model exposes no such attribute, rather than emitting a
``None`` cursor token.
+ try:
+ return getattr(row, name)
+ except AttributeError:
+ raise NotImplementedError(
+ f"Cursor pagination cannot resolve column-form
``to_replace`` for "
+ f"``{name}``: the primary model exposes no such
attribute. Add an "
+ f"association proxy, use a string alias, or sort by a
primary-model column."
+ )
# List-form replacements are expanded in _resolve() into
individual entries
# each using the column's own ORM key as attr_name, so ``name`` at
this point
# is already a concrete model attribute (e.g.
``_rendered_map_index`` or
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_cursors.py
b/airflow-core/tests/unit/api_fastapi/common/test_cursors.py
index 11db6ca5ba8..8bf8adbba36 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_cursors.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_cursors.py
@@ -20,6 +20,7 @@ from __future__ import annotations
import base64
import uuid
from datetime import datetime, timezone
+from types import SimpleNamespace
from unittest.mock import MagicMock
import msgspec
@@ -29,6 +30,7 @@ from sqlalchemy import select
from airflow.api_fastapi.common.cursors import apply_cursor_filter,
decode_cursor, encode_cursor
from airflow.api_fastapi.common.parameters import SortParam
+from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
@@ -85,6 +87,40 @@ class TestCursorPagination:
decoded = decode_cursor(token)
assert decoded == ["019462ab-1234-5678-9abc-def012345678"]
+ def
test_encode_cursor_with_column_form_to_replace_falls_back_to_row_attr(self):
+ """Column-form ``to_replace`` should still allow cursor encoding via
row attribute access."""
+ sp = SortParam(["id", "run_after"], TaskInstance, {"run_after":
DagRun.run_after})
+ sp.set_value(["run_after"])
+
+ row = SimpleNamespace(
+ run_after="2026-06-04T10:00:00+00:00",
+ id="019462ab-1234-5678-9abc-def012345678",
+ )
+
+ token = encode_cursor(row, sp)
+ decoded = decode_cursor(token)
+ assert decoded == [
+ "2026-06-04T10:00:00+00:00",
+ "019462ab-1234-5678-9abc-def012345678",
+ ]
+
+ def
test_encode_cursor_column_form_to_replace_raises_when_attribute_absent(self):
+ """
+ ``encode_cursor`` must raise ``NotImplementedError`` (not silently
encode ``None``)
+ when a column-form ``to_replace`` key has no corresponding attribute
on the row object.
+ A ``None`` token would cause the next-page WHERE to compare against
NULL and drop rows.
+ """
+ sp = SortParam(
+ ["id", "data_interval_start"], TaskInstance,
{"data_interval_start": DagRun.data_interval_start}
+ )
+ sp.set_value(["data_interval_start"])
+
+ # Row without data_interval_start — TaskInstance does not expose this
as an attribute.
+ row = SimpleNamespace(id="019462ab-1234-5678-9abc-def012345678")
+
+ with pytest.raises(NotImplementedError, match="data_interval_start"):
+ encode_cursor(row, sp)
+
def test_apply_cursor_filter_wrong_value_count(self):
sp = self._make_sort_param_with_resolved_columns(["start_date"])
token = _msgpack_cursor_token(["only-one-value"])
diff --git a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
index e2742d88a94..ecf20a47c6f 100644
--- a/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
+++ b/airflow-core/tests/unit/api_fastapi/common/test_parameters.py
@@ -135,17 +135,29 @@ class TestSortParam:
assert param.row_value(row, "dag_run_id") == "manual__2026-04-22"
assert param.row_value(row, "id") == 42
- def test_row_value_raises_on_column_form_to_replace(self):
+ def test_row_value_column_form_to_replace_resolves_via_row_attribute(self):
"""
- Column-form ``to_replace`` is not supported by cursor encoding. The
helper must
- fail loudly so a future endpoint doesn't silently ship ``None`` cursor
tokens.
+ Column-form ``to_replace`` resolves through the primary model's
attribute so
+ association proxies (e.g. ``TaskInstance.run_after``) are usable for
cursor encoding.
"""
param = SortParam(["dag_id"], DagModel, {"last_run_state":
DagRun.state}).set_value(
["last_run_state"]
)
- row = SimpleNamespace(id="test_dag")
- with pytest.raises(NotImplementedError, match="column-form
``to_replace``"):
- param.row_value(row, "last_run_state")
+ row = SimpleNamespace(id="test_dag", last_run_state="success")
+ assert param.row_value(row, "last_run_state") == "success"
+
+ def
test_row_value_column_form_to_replace_raises_when_attribute_absent(self):
+ """
+ Column-form ``to_replace`` must raise ``NotImplementedError`` (not
return ``None``)
+ when the primary model exposes no such attribute. A ``None`` cursor
token would cause
+ the next-page ``WHERE`` to compare against ``NULL`` and silently drop
rows.
+ """
+ param = SortParam(
+ ["dag_id"], DagModel, {"data_interval_start":
DagRun.data_interval_start}
+ ).set_value(["data_interval_start"])
+ row = SimpleNamespace(id="test_dag") # deliberately no
data_interval_start attribute
+ with pytest.raises(NotImplementedError, match="data_interval_start"):
+ param.row_value(row, "data_interval_start")
def test_primary_key_is_not_duplicated_when_alias_maps_to_pk(self):
"""Sorting by an alias that resolves to the PK must not append the PK
a second time."""
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 6edae03cb09..3627ba62d68 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -43,12 +43,12 @@ from airflow.models.dag_version import DagVersion
from airflow.models.dagbundle import DagBundleModel
from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF
from airflow.models.task_store import TaskStoreModel
+from airflow.models.taskinstance import uuid7
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.models.taskmap import TaskMap
from airflow.models.team import Team
from airflow.models.trigger import Trigger
-from airflow.providers.standard.operators.empty import EmptyOperator
-from airflow.sdk import BaseOperator, TaskGroup
+from airflow.sdk import BaseOperator
from airflow.state.metastore import MetastoreStoreBackend
from airflow.utils.platform import getuser
from airflow.utils.state import DagRunState, State, TaskInstanceState
@@ -979,10 +979,6 @@ class TestGetMappedTaskInstances:
This verifies that even when UUIDs are assigned out of map_index order
(as happens during retries), the response is still sorted 0, 1, 2, ...
"""
- from sqlalchemy import update as sa_update
-
- from airflow.models.taskinstance import uuid7
-
self.create_dag_runs_with_mapped_tasks(
dag_maker,
session,
@@ -994,7 +990,7 @@ class TestGetMappedTaskInstances:
# result must still follow integer map_index order, not UUID order.
for map_index in [1, 3]:
session.execute(
- sa_update(TaskInstance)
+ update(TaskInstance)
.where(
TaskInstance.dag_id == "retry_dag",
TaskInstance.task_id == "task_2",
@@ -2117,46 +2113,46 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
)
assert response.status_code == 400
- def test_task_group_filter_uses_run_version_not_latest(self, test_client,
dag_maker, session):
+ def test_cursor_pagination_order_by_run_after_roundtrips(self,
test_client, session):
"""
- Task group lookup should use the DAG version from the run, not the
latest version.
+ Sorting by ``run_after`` (a column-form ``to_replace`` backed by an
association proxy)
+ must not raise a 500 when ``has_next=true``. Regression for
+ https://github.com/apache/airflow/issues/67970.
- When a task group is renamed between versions, clicking on a
historical run's
- task group in the grid should still resolve correctly against the
version
- that run was created with — not the latest version where the group may
have
- a different name, i.e serialized_dag might not have that taskgroup
anymore.
+ Verify the full cursor round-trip: the first page must include a
``next_cursor``,
+ and following that cursor must return the remaining TIs without
overlap.
"""
- dag_id = "test_tg_version"
-
- # Version 1: task group named "process_data"
- with dag_maker(dag_id, session=session):
- with TaskGroup(group_id="process_data"):
- EmptyOperator(task_id="step_1")
- dag_maker.create_dagrun(run_id="run_v1")
- session.commit()
-
- # Version 2: task group renamed to "process_data_v2"
- with dag_maker(dag_id, session=session):
- with TaskGroup(group_id="process_data_v2"):
- EmptyOperator(task_id="step_1")
- session.commit()
-
- # The run was created with v1 which had "process_data".
- # Querying with the old group name must succeed.
- response = test_client.get(
- f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
- params={"task_group_id": "process_data"},
+ dag_id = "example_python_operator"
+ total_tis = 5
+ self.create_task_instances(
+ session,
+ task_instances=[
+ {"start_date": DEFAULT_DATETIME_1 + dt.timedelta(minutes=(i +
1))} for i in range(total_tis)
+ ],
+ dag_id=dag_id,
)
- assert response.status_code == 200, response.json()
- assert response.json()["total_entries"] == 1
- assert response.json()["task_instances"][0]["task_id"] ==
"process_data.step_1"
+ # First page — limit < total so next_cursor must be present
+ response1 = test_client.get(
+ "/dags/~/dagRuns/~/taskInstances",
+ params={"limit": 3, "order_by": ["-run_after"], "cursor": ""},
+ )
+ assert response1.status_code == 200, response1.json()
+ body1 = response1.json()
+ assert len(body1["task_instances"]) == 3
+ next_cursor = body1["next_cursor"]
+ assert next_cursor is not None, "next_cursor must be present when more
rows exist"
- # The new group name should NOT be found in the old run's version.
- response = test_client.get(
- f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
- params={"task_group_id": "process_data_v2"},
+ # Second page — follow the cursor; must not 500 and must return
remaining TIs
+ response2 = test_client.get(
+ "/dags/~/dagRuns/~/taskInstances",
+ params={"limit": 10, "order_by": ["-run_after"], "cursor":
next_cursor},
)
- assert response.status_code == 404
+ assert response2.status_code == 200, response2.json()
+ body2 = response2.json()
+ ids1 = {ti["id"] for ti in body1["task_instances"]}
+ ids2 = {ti["id"] for ti in body2["task_instances"]}
+ assert ids1.isdisjoint(ids2), "Pages must not overlap"
+ assert len(ids1) + len(ids2) == total_tis
class TestGetTaskDependencies(TestTaskInstanceEndpoint):