This is an automated email from the ASF dual-hosted git repository.

kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d74fbff4e32 Apply `rerun_with_latest_version` to TriggerDagRunOperator 
reruns (#67273)
d74fbff4e32 is described below

commit d74fbff4e32108df9276c9a907536d7da710f8dc
Author: Nathan Hadfield <[email protected]>
AuthorDate: Fri Jun 19 10:54:47 2026 +0100

    Apply `rerun_with_latest_version` to TriggerDagRunOperator reruns (#67273)
---
 .../src/airflow/api_fastapi/common/dagbag.py       | 25 +++++++++++++++++
 .../core_api/routes/public/backfills.py            |  2 +-
 .../core_api/routes/public/task_instances.py       |  2 +-
 .../api_fastapi/core_api/services/public/common.py | 26 ------------------
 .../core_api/services/public/dag_run.py            |  9 +++++--
 .../api_fastapi/execution_api/routes/dag_runs.py   |  5 ++--
 .../src/airflow/cli/commands/backfill_command.py   |  2 +-
 .../core_api/routes/public/test_dag_run.py         |  2 +-
 .../execution_api/versions/head/test_dag_runs.py   | 31 ++++++++++++++++++++++
 9 files changed, 70 insertions(+), 34 deletions(-)

diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py 
b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
index af3a05d98af..d87aca49a52 100644
--- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
+++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
@@ -24,6 +24,7 @@ from sqlalchemy.orm import Session
 
 from airflow.configuration import conf
 from airflow.models.dagbag import DBDagBag
+from airflow.models.serialized_dag import SerializedDagModel
 
 if TYPE_CHECKING:
     from airflow.models.dagrun import DagRun
@@ -115,4 +116,28 @@ def get_dag_for_run_or_latest_version(
     return dag
 
 
+def resolve_run_on_latest_version(
+    explicit_value: bool | None,
+    dag_id: str,
+    session: Session,
+    fallback: bool = False,
+) -> bool:
+    """
+    Resolve run_on_latest_version using precedence: explicit > DAG-level > 
global config > fallback.
+
+    :param explicit_value: Value from the API request body (or None if not 
specified).
+    :param dag_id: The DAG ID to look up.
+    :param session: Database session.
+    :param fallback: Default to use when neither DAG-level nor global config 
is set.
+        Clear/rerun endpoints use False (the historical default).
+        Backfill endpoint uses True (the historical default for backfills).
+    """
+    if explicit_value is not None:
+        return explicit_value
+    serialized = SerializedDagModel.get_dag(dag_id, session=session)
+    if serialized and serialized.rerun_with_latest_version is not None:
+        return serialized.rerun_with_latest_version
+    return conf.getboolean("core", "rerun_with_latest_version", 
fallback=fallback)
+
+
 DagBagDep = Annotated[DBDagBag, Depends(dag_bag_from_app)]
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
index 4e4bc90dceb..0dba6086e0a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -25,6 +25,7 @@ from sqlalchemy import select, update
 from sqlalchemy.orm import joinedload
 
 from airflow._shared.timezones import timezone
+from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version
 from airflow.api_fastapi.common.db.common import (
     SessionDep,
     paginated_select,
@@ -42,7 +43,6 @@ from airflow.api_fastapi.core_api.openapi.exceptions import (
     create_openapi_http_exception_doc,
 )
 from airflow.api_fastapi.core_api.security import GetUserDep, 
requires_access_backfill
-from airflow.api_fastapi.core_api.services.public.common import 
resolve_run_on_latest_version
 from airflow.api_fastapi.logging.decorators import action_logging
 from airflow.exceptions import DagNotFound, DagRunTypeNotAllowed
 from airflow.models import DagRun
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 2f1c1d3d09b..68120ace153 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -38,6 +38,7 @@ from airflow.api_fastapi.common.dagbag import (
     get_dag_for_run,
     get_dag_for_run_or_latest_version,
     get_latest_version_of_dag,
+    resolve_run_on_latest_version,
 )
 from airflow.api_fastapi.common.db.common import SessionDep, 
apply_filters_to_select, paginated_select
 from airflow.api_fastapi.common.db.task_instances import 
eager_load_TI_and_TIH_for_validation
@@ -96,7 +97,6 @@ from airflow.api_fastapi.core_api.datamodels.task_instances 
import (
 )
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
 from airflow.api_fastapi.core_api.security import GetUserDep, 
ReadableTIFilterDep, requires_access_dag
-from airflow.api_fastapi.core_api.services.public.common import 
resolve_run_on_latest_version
 from airflow.api_fastapi.core_api.services.public.task_instances import (
     BulkTaskInstanceService,
     _get_task_group_task_instances,
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
index e016cb908c4..37d3e5e08df 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
@@ -35,8 +35,6 @@ from airflow.api_fastapi.core_api.datamodels.common import (
     BulkUpdateAction,
     T,
 )
-from airflow.configuration import conf
-from airflow.models.serialized_dag import SerializedDagModel
 
 
 class BulkService(Generic[T], ABC):
@@ -122,27 +120,3 @@ class BulkService(Generic[T], ABC):
             setattr(model, key, value)
 
         return model
-
-
-def resolve_run_on_latest_version(
-    explicit_value: bool | None,
-    dag_id: str,
-    session: Session,
-    fallback: bool = False,
-) -> bool:
-    """
-    Resolve run_on_latest_version using precedence: explicit > DAG-level > 
global config > fallback.
-
-    :param explicit_value: Value from the API request body (or None if not 
specified).
-    :param dag_id: The DAG ID to look up.
-    :param session: Database session.
-    :param fallback: Default to use when neither DAG-level nor global config 
is set.
-        Clear/rerun endpoints use False (the historical default).
-        Backfill endpoint uses True (the historical default for backfills).
-    """
-    if explicit_value is not None:
-        return explicit_value
-    serialized = SerializedDagModel.get_dag(dag_id, session=session)
-    if serialized and serialized.rerun_with_latest_version is not None:
-        return serialized.rerun_with_latest_version
-    return conf.getboolean("core", "rerun_with_latest_version", 
fallback=fallback)
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index c473b908b0e..ea9d3ca5ce2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -35,7 +35,12 @@ from airflow.api.common.mark_tasks import (
     set_dag_run_state_to_success,
 )
 from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
-from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, 
get_latest_version_of_dag
+from airflow.api_fastapi.common.dagbag import (
+    DagBagDep,
+    get_dag_for_run,
+    get_latest_version_of_dag,
+    resolve_run_on_latest_version,
+)
 from airflow.api_fastapi.common.db.task_instances import 
eager_load_TI_and_TIH_for_validation
 from airflow.api_fastapi.core_api.datamodels.common import (
     BulkActionNotOnExistence,
@@ -47,7 +52,7 @@ from airflow.api_fastapi.core_api.datamodels.common import (
 )
 from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody, 
DagRunMutableStates
 from airflow.api_fastapi.core_api.datamodels.task_instances import 
NewTaskResponse
-from airflow.api_fastapi.core_api.services.public.common import BulkService, 
resolve_run_on_latest_version
+from airflow.api_fastapi.core_api.services.public.common import BulkService
 from airflow.listeners.listener import get_listener_manager
 from airflow.models.dagrun import DagRun
 from airflow.models.taskinstance import TaskInstance
diff --git 
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py 
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 1092369e913..31aeb96c63e 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -26,7 +26,7 @@ from sqlalchemy import func, select
 from sqlalchemy.exc import NoResultFound
 
 from airflow.api.common.trigger_dag import trigger_dag
-from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run
+from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run, 
resolve_run_on_latest_version
 from airflow.api_fastapi.common.db.common import SessionDep
 from airflow.api_fastapi.common.types import UtcDateTime
 from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT
@@ -207,7 +207,8 @@ def clear_dag_run(
         )
     dag = get_dag_for_run(dag_bag, dag_run=dag_run, session=session)
 
-    dag.clear(run_id=run_id)
+    resolved_run_on_latest = resolve_run_on_latest_version(None, dag_id, 
session)
+    dag.clear(run_id=run_id, run_on_latest_version=resolved_run_on_latest)
 
 
 @router.get(
diff --git a/airflow-core/src/airflow/cli/commands/backfill_command.py 
b/airflow-core/src/airflow/cli/commands/backfill_command.py
index 5ce8cc92242..d76d435a391 100644
--- a/airflow-core/src/airflow/cli/commands/backfill_command.py
+++ b/airflow-core/src/airflow/cli/commands/backfill_command.py
@@ -24,7 +24,7 @@ import signal
 from tabulate import tabulate
 
 from airflow import settings
-from airflow.api_fastapi.core_api.services.public.common import 
resolve_run_on_latest_version
+from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version
 from airflow.cli.simple_table import AirflowConsole
 from airflow.exceptions import AirflowConfigException
 from airflow.models.backfill import ReprocessBehavior, _create_backfill, 
_do_dry_run
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index ad11f764d48..d953e950dea 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -32,8 +32,8 @@ from airflow._shared.module_loading import qualname
 from airflow._shared.timezones import timezone
 from airflow.api_fastapi.auth.managers.models.resource_details import 
DagAccessEntity, DagDetails
 from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
+from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version
 from airflow.api_fastapi.core_api.datamodels.dag_versions import 
DagVersionResponse
-from airflow.api_fastapi.core_api.services.public.common import 
resolve_run_on_latest_version
 from airflow.exceptions import ParamValidationError
 from airflow.models import DagModel, DagRun, Log
 from airflow.models.asset import AssetEvent, AssetModel
diff --git 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index 5d3c37c9b52..fbe7cc56e4a 100644
--- 
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++ 
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -17,6 +17,8 @@
 
 from __future__ import annotations
 
+from unittest import mock
+
 import pytest
 import time_machine
 from fastapi import Request
@@ -28,6 +30,7 @@ from airflow.api_fastapi.execution_api.security import 
require_auth
 from airflow.models import DagModel
 from airflow.models.dagrun import DagRun
 from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.serialization.definitions.dag import SerializedDAG
 from airflow.timetables.trigger import CronPartitionTimetable
 from airflow.utils.state import DagRunState, State
 from airflow.utils.types import DagRunType
@@ -381,6 +384,34 @@ class TestDagRunClear:
 
         assert response.status_code == 404
 
+    def test_dag_run_clear_invokes_resolver(self, client, session, dag_maker):
+        """Clearing resolves run_on_latest_version (no explicit override) and 
forwards it to dag.clear."""
+        dag_id = "test_clear_invokes_resolver"
+        run_id = "test_run_id"
+
+        with dag_maker(dag_id=dag_id, session=session, serialized=True):
+            EmptyOperator(task_id="test_task")
+        dag_maker.create_dagrun(run_id=run_id, state=DagRunState.SUCCESS)
+        session.commit()
+
+        with (
+            mock.patch(
+                
"airflow.api_fastapi.execution_api.routes.dag_runs.resolve_run_on_latest_version",
+                return_value=mock.sentinel.resolved,
+            ) as mock_resolver,
+            mock.patch.object(SerializedDAG, "clear", autospec=True) as 
mock_clear,
+        ):
+            response = 
client.post(f"/execution/dag-runs/{dag_id}/{run_id}/clear")
+
+        assert response.status_code == 204
+        mock_resolver.assert_called_once()
+        # First positional arg is the explicit override; operator does not 
pass one.
+        assert mock_resolver.call_args.args[0] is None
+        # The resolved value must reach dag.clear, not be silently dropped.
+        mock_clear.assert_called_once()
+        assert mock_clear.call_args.kwargs["run_id"] == run_id
+        assert mock_clear.call_args.kwargs["run_on_latest_version"] is 
mock.sentinel.resolved
+
 
 class TestDagRunDetail:
     def setup_method(self):

Reply via email to