This is an automated email from the ASF dual-hosted git repository.
kaxil pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d74fbff4e32 Apply `rerun_with_latest_version` to TriggerDagRunOperator
reruns (#67273)
d74fbff4e32 is described below
commit d74fbff4e32108df9276c9a907536d7da710f8dc
Author: Nathan Hadfield <[email protected]>
AuthorDate: Fri Jun 19 10:54:47 2026 +0100
Apply `rerun_with_latest_version` to TriggerDagRunOperator reruns (#67273)
---
.../src/airflow/api_fastapi/common/dagbag.py | 25 +++++++++++++++++
.../core_api/routes/public/backfills.py | 2 +-
.../core_api/routes/public/task_instances.py | 2 +-
.../api_fastapi/core_api/services/public/common.py | 26 ------------------
.../core_api/services/public/dag_run.py | 9 +++++--
.../api_fastapi/execution_api/routes/dag_runs.py | 5 ++--
.../src/airflow/cli/commands/backfill_command.py | 2 +-
.../core_api/routes/public/test_dag_run.py | 2 +-
.../execution_api/versions/head/test_dag_runs.py | 31 ++++++++++++++++++++++
9 files changed, 70 insertions(+), 34 deletions(-)
diff --git a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
index af3a05d98af..d87aca49a52 100644
--- a/airflow-core/src/airflow/api_fastapi/common/dagbag.py
+++ b/airflow-core/src/airflow/api_fastapi/common/dagbag.py
@@ -24,6 +24,7 @@ from sqlalchemy.orm import Session
from airflow.configuration import conf
from airflow.models.dagbag import DBDagBag
+from airflow.models.serialized_dag import SerializedDagModel
if TYPE_CHECKING:
from airflow.models.dagrun import DagRun
@@ -115,4 +116,28 @@ def get_dag_for_run_or_latest_version(
return dag
+def resolve_run_on_latest_version(
+ explicit_value: bool | None,
+ dag_id: str,
+ session: Session,
+ fallback: bool = False,
+) -> bool:
+ """
+ Resolve run_on_latest_version using precedence: explicit > DAG-level >
global config > fallback.
+
+ :param explicit_value: Value from the API request body (or None if not
specified).
+ :param dag_id: The DAG ID to look up.
+ :param session: Database session.
+ :param fallback: Default to use when neither DAG-level nor global config
is set.
+ Clear/rerun endpoints use False (the historical default).
+ Backfill endpoint uses True (the historical default for backfills).
+ """
+ if explicit_value is not None:
+ return explicit_value
+ serialized = SerializedDagModel.get_dag(dag_id, session=session)
+ if serialized and serialized.rerun_with_latest_version is not None:
+ return serialized.rerun_with_latest_version
+ return conf.getboolean("core", "rerun_with_latest_version",
fallback=fallback)
+
+
DagBagDep = Annotated[DBDagBag, Depends(dag_bag_from_app)]
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
index 4e4bc90dceb..0dba6086e0a 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -25,6 +25,7 @@ from sqlalchemy import select, update
from sqlalchemy.orm import joinedload
from airflow._shared.timezones import timezone
+from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version
from airflow.api_fastapi.common.db.common import (
SessionDep,
paginated_select,
@@ -42,7 +43,6 @@ from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.api_fastapi.core_api.security import GetUserDep,
requires_access_backfill
-from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import DagNotFound, DagRunTypeNotAllowed
from airflow.models import DagRun
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 2f1c1d3d09b..68120ace153 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -38,6 +38,7 @@ from airflow.api_fastapi.common.dagbag import (
get_dag_for_run,
get_dag_for_run_or_latest_version,
get_latest_version_of_dag,
+ resolve_run_on_latest_version,
)
from airflow.api_fastapi.common.db.common import SessionDep,
apply_filters_to_select, paginated_select
from airflow.api_fastapi.common.db.task_instances import
eager_load_TI_and_TIH_for_validation
@@ -96,7 +97,6 @@ from airflow.api_fastapi.core_api.datamodels.task_instances
import (
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import GetUserDep,
ReadableTIFilterDep, requires_access_dag
-from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
from airflow.api_fastapi.core_api.services.public.task_instances import (
BulkTaskInstanceService,
_get_task_group_task_instances,
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
index e016cb908c4..37d3e5e08df 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
@@ -35,8 +35,6 @@ from airflow.api_fastapi.core_api.datamodels.common import (
BulkUpdateAction,
T,
)
-from airflow.configuration import conf
-from airflow.models.serialized_dag import SerializedDagModel
class BulkService(Generic[T], ABC):
@@ -122,27 +120,3 @@ class BulkService(Generic[T], ABC):
setattr(model, key, value)
return model
-
-
-def resolve_run_on_latest_version(
- explicit_value: bool | None,
- dag_id: str,
- session: Session,
- fallback: bool = False,
-) -> bool:
- """
- Resolve run_on_latest_version using precedence: explicit > DAG-level >
global config > fallback.
-
- :param explicit_value: Value from the API request body (or None if not
specified).
- :param dag_id: The DAG ID to look up.
- :param session: Database session.
- :param fallback: Default to use when neither DAG-level nor global config
is set.
- Clear/rerun endpoints use False (the historical default).
- Backfill endpoint uses True (the historical default for backfills).
- """
- if explicit_value is not None:
- return explicit_value
- serialized = SerializedDagModel.get_dag(dag_id, session=session)
- if serialized and serialized.rerun_with_latest_version is not None:
- return serialized.rerun_with_latest_version
- return conf.getboolean("core", "rerun_with_latest_version",
fallback=fallback)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
index c473b908b0e..ea9d3ca5ce2 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/dag_run.py
@@ -35,7 +35,12 @@ from airflow.api.common.mark_tasks import (
set_dag_run_state_to_success,
)
from airflow.api_fastapi.auth.managers.models.base_user import BaseUser
-from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run,
get_latest_version_of_dag
+from airflow.api_fastapi.common.dagbag import (
+ DagBagDep,
+ get_dag_for_run,
+ get_latest_version_of_dag,
+ resolve_run_on_latest_version,
+)
from airflow.api_fastapi.common.db.task_instances import
eager_load_TI_and_TIH_for_validation
from airflow.api_fastapi.core_api.datamodels.common import (
BulkActionNotOnExistence,
@@ -47,7 +52,7 @@ from airflow.api_fastapi.core_api.datamodels.common import (
)
from airflow.api_fastapi.core_api.datamodels.dag_run import BulkDAGRunBody,
DagRunMutableStates
from airflow.api_fastapi.core_api.datamodels.task_instances import
NewTaskResponse
-from airflow.api_fastapi.core_api.services.public.common import BulkService,
resolve_run_on_latest_version
+from airflow.api_fastapi.core_api.services.public.common import BulkService
from airflow.listeners.listener import get_listener_manager
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
diff --git
a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
index 1092369e913..31aeb96c63e 100644
--- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
+++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/dag_runs.py
@@ -26,7 +26,7 @@ from sqlalchemy import func, select
from sqlalchemy.exc import NoResultFound
from airflow.api.common.trigger_dag import trigger_dag
-from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run
+from airflow.api_fastapi.common.dagbag import DagBagDep, get_dag_for_run,
resolve_run_on_latest_version
from airflow.api_fastapi.common.db.common import SessionDep
from airflow.api_fastapi.common.types import UtcDateTime
from airflow.api_fastapi.compat import HTTP_422_UNPROCESSABLE_CONTENT
@@ -207,7 +207,8 @@ def clear_dag_run(
)
dag = get_dag_for_run(dag_bag, dag_run=dag_run, session=session)
- dag.clear(run_id=run_id)
+ resolved_run_on_latest = resolve_run_on_latest_version(None, dag_id,
session)
+ dag.clear(run_id=run_id, run_on_latest_version=resolved_run_on_latest)
@router.get(
diff --git a/airflow-core/src/airflow/cli/commands/backfill_command.py
b/airflow-core/src/airflow/cli/commands/backfill_command.py
index 5ce8cc92242..d76d435a391 100644
--- a/airflow-core/src/airflow/cli/commands/backfill_command.py
+++ b/airflow-core/src/airflow/cli/commands/backfill_command.py
@@ -24,7 +24,7 @@ import signal
from tabulate import tabulate
from airflow import settings
-from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
+from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version
from airflow.cli.simple_table import AirflowConsole
from airflow.exceptions import AirflowConfigException
from airflow.models.backfill import ReprocessBehavior, _create_backfill,
_do_dry_run
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index ad11f764d48..d953e950dea 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -32,8 +32,8 @@ from airflow._shared.module_loading import qualname
from airflow._shared.timezones import timezone
from airflow.api_fastapi.auth.managers.models.resource_details import
DagAccessEntity, DagDetails
from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser
+from airflow.api_fastapi.common.dagbag import resolve_run_on_latest_version
from airflow.api_fastapi.core_api.datamodels.dag_versions import
DagVersionResponse
-from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
from airflow.exceptions import ParamValidationError
from airflow.models import DagModel, DagRun, Log
from airflow.models.asset import AssetEvent, AssetModel
diff --git
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
index 5d3c37c9b52..fbe7cc56e4a 100644
---
a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
+++
b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_dag_runs.py
@@ -17,6 +17,8 @@
from __future__ import annotations
+from unittest import mock
+
import pytest
import time_machine
from fastapi import Request
@@ -28,6 +30,7 @@ from airflow.api_fastapi.execution_api.security import
require_auth
from airflow.models import DagModel
from airflow.models.dagrun import DagRun
from airflow.providers.standard.operators.empty import EmptyOperator
+from airflow.serialization.definitions.dag import SerializedDAG
from airflow.timetables.trigger import CronPartitionTimetable
from airflow.utils.state import DagRunState, State
from airflow.utils.types import DagRunType
@@ -381,6 +384,34 @@ class TestDagRunClear:
assert response.status_code == 404
+ def test_dag_run_clear_invokes_resolver(self, client, session, dag_maker):
+ """Clearing resolves run_on_latest_version (no explicit override) and
forwards it to dag.clear."""
+ dag_id = "test_clear_invokes_resolver"
+ run_id = "test_run_id"
+
+ with dag_maker(dag_id=dag_id, session=session, serialized=True):
+ EmptyOperator(task_id="test_task")
+ dag_maker.create_dagrun(run_id=run_id, state=DagRunState.SUCCESS)
+ session.commit()
+
+ with (
+ mock.patch(
+
"airflow.api_fastapi.execution_api.routes.dag_runs.resolve_run_on_latest_version",
+ return_value=mock.sentinel.resolved,
+ ) as mock_resolver,
+ mock.patch.object(SerializedDAG, "clear", autospec=True) as
mock_clear,
+ ):
+ response =
client.post(f"/execution/dag-runs/{dag_id}/{run_id}/clear")
+
+ assert response.status_code == 204
+ mock_resolver.assert_called_once()
+ # First positional arg is the explicit override; operator does not
pass one.
+ assert mock_resolver.call_args.args[0] is None
+ # The resolved value must reach dag.clear, not be silently dropped.
+ mock_clear.assert_called_once()
+ assert mock_clear.call_args.kwargs["run_id"] == run_id
+ assert mock_clear.call_args.kwargs["run_on_latest_version"] is
mock.sentinel.resolved
+
class TestDagRunDetail:
def setup_method(self):