This is an automated email from the ASF dual-hosted git repository.
pierrejeambrun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 48cc0316003 Fix task group lookup using wrong DAG version for
historical runs (#63360)
48cc0316003 is described below
commit 48cc031600372d1695efd3847ec56270e3c3e7cd
Author: Pierre Jeambrun <[email protected]>
AuthorDate: Thu Mar 12 12:18:32 2026 +0100
Fix task group lookup using wrong DAG version for historical runs (#63360)
* Fix task group lookup using wrong DAG version for historical runs
When a task group is renamed between DAG versions, the API's
get_task_instances endpoint was resolving task groups against the
latest DAG version instead of the version the run was created with.
This caused 404 errors when clicking on task groups in the grid view
for historical runs.
The fix changes get_dag_for_run_or_latest_version to prefer the
run's created_dag_version_id, falling back to the existing behavior
only when unavailable.
* Move inline comment to docstring in get_dag_for_run_or_latest_version
---
.../src/airflow/api_fastapi/common/dagbag.py | 16 +++++++-
.../core_api/routes/public/test_task_instances.py | 44 +++++++++++++++++++++-
2 files changed, 58 insertions(+), 2 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
index ce81f6906b7..3ca4483ce87 100644
--- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
+++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
@@ -70,9 +70,23 @@ def get_dag_for_run(dag_bag: DBDagBag, dag_run: DagRun,
session: Session) -> Ser
def get_dag_for_run_or_latest_version(
dag_bag: DBDagBag, dag_run: DagRun | None, dag_id: str | None, session:
Session
) -> SerializedDAG:
+ """
+ Retrieve the serialized DAG for a specific run, or the latest version if
no run is given.
+
+ When a dag_run is provided, we prefer the exact DAG version the run was
created with
+ (``created_dag_version_id``) so that task group lookups, operator
metadata, etc. match
+ the DAG structure at the time of the run.
+
+ This is necessary because ``get_dag_for_run`` delegates to
``_version_from_dag_run``
+ which, for unversioned bundles (e.g. ``LocalDagBundle``), falls back to
the *latest*
+ ``DagVersion``.
+ """
dag: SerializedDAG | None = None
if dag_run:
- dag = dag_bag.get_dag_for_run(dag_run, session=session)
+ if dag_run.created_dag_version_id:
+ dag = dag_bag._get_dag(dag_run.created_dag_version_id,
session=session)
+ if not dag:
+ dag = dag_bag.get_dag_for_run(dag_run, session=session)
elif dag_id:
dag = dag_bag.get_latest_version_of_dag(dag_id, session=session)
if not dag:
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
index 4a0ef3e1be9..6bf7213c9c8 100644
---
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
+++
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py
@@ -39,7 +39,8 @@ from airflow.models.renderedtifields import
RenderedTaskInstanceFields as RTIF
from airflow.models.taskinstancehistory import TaskInstanceHistory
from airflow.models.taskmap import TaskMap
from airflow.models.trigger import Trigger
-from airflow.sdk import BaseOperator
+from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.sdk import BaseOperator, TaskGroup
from airflow.utils.platform import getuser
from airflow.utils.state import DagRunState, State, TaskInstanceState
from airflow.utils.types import DagRunType
@@ -1646,6 +1647,47 @@ class TestGetTaskInstances(TestTaskInstanceEndpoint):
assert (num_entries_batch1 + num_entries_batch2) == ti_count
assert response_batch1 != response_batch2
+ def test_task_group_filter_uses_run_version_not_latest(self, test_client,
dag_maker, session):
+ """
+ Task group lookup should use the DAG version from the run, not the
latest version.
+
+ When a task group is renamed between versions, clicking on a
historical run's
+ task group in the grid should still resolve correctly against the
version
+ that run was created with — not the latest version where the group may
have
+ a different name, i.e serialized_dag might not have that taskgroup
anymore.
+ """
+ dag_id = "test_tg_version"
+
+ # Version 1: task group named "process_data"
+ with dag_maker(dag_id, session=session):
+ with TaskGroup(group_id="process_data"):
+ EmptyOperator(task_id="step_1")
+ dag_maker.create_dagrun(run_id="run_v1")
+ session.commit()
+
+ # Version 2: task group renamed to "process_data_v2"
+ with dag_maker(dag_id, session=session):
+ with TaskGroup(group_id="process_data_v2"):
+ EmptyOperator(task_id="step_1")
+ session.commit()
+
+ # The run was created with v1 which had "process_data".
+ # Querying with the old group name must succeed.
+ response = test_client.get(
+ f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
+ params={"task_group_id": "process_data"},
+ )
+ assert response.status_code == 200, response.json()
+ assert response.json()["total_entries"] == 1
+ assert response.json()["task_instances"][0]["task_id"] ==
"process_data.step_1"
+
+ # The new group name should NOT be found in the old run's version.
+ response = test_client.get(
+ f"/dags/{dag_id}/dagRuns/run_v1/taskInstances",
+ params={"task_group_id": "process_data_v2"},
+ )
+ assert response.status_code == 404
+
class TestGetTaskDependencies(TestTaskInstanceEndpoint):
def setup_method(self):