This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 13e2125a2c3 Add rerun_with_latest_version config hierarchy for
clear/rerun behavior (#63884)
13e2125a2c3 is described below
commit 13e2125a2c3955d36f7bfd6c61935dea7b2b9248
Author: Nathan Hadfield <[email protected]>
AuthorDate: Wed May 20 18:35:23 2026 +0100
Add rerun_with_latest_version config hierarchy for clear/rerun behavior
(#63884)
Adds a three-level configuration hierarchy controlling the default
"run on latest version" checkbox when clearing DAG runs or task instances:
1. Explicit API request value (if provided)
2. DAG-level `rerun_with_latest_version` parameter
3. Global `[core] rerun_with_latest_version` config option
4. Default: False (use original bundle version)
API clear endpoints now accept `run_on_latest_version: null` and resolve
the default server-side using the hierarchy, so both UI and programmatic
callers benefit from the configuration.
closes: #60887
---
.../administration-and-deployment/dag-bundles.rst | 105 ++++++++++++++
airflow-core/newsfragments/63884.significant.rst | 34 +++++
.../api_fastapi/core_api/datamodels/backfills.py | 8 +-
.../api_fastapi/core_api/datamodels/dag_run.py | 9 +-
.../api_fastapi/core_api/datamodels/dags.py | 1 +
.../core_api/datamodels/task_instances.py | 9 +-
.../api_fastapi/core_api/datamodels/ui/config.py | 1 +
.../api_fastapi/core_api/openapi/_private_ui.yaml | 5 +
.../core_api/openapi/v2-rest-api-generated.yaml | 32 +++--
.../core_api/routes/public/backfills.py | 10 +-
.../api_fastapi/core_api/routes/public/dag_run.py | 5 +-
.../core_api/routes/public/task_instances.py | 9 +-
.../api_fastapi/core_api/routes/ui/config.py | 7 +
.../api_fastapi/core_api/services/public/common.py | 26 ++++
airflow-core/src/airflow/cli/cli_config.py | 8 +-
.../src/airflow/cli/commands/backfill_command.py | 13 +-
.../src/airflow/config_templates/config.yml | 13 ++
.../src/airflow/serialization/definitions/dag.py | 2 +
airflow-core/src/airflow/serialization/schema.json | 3 +-
.../airflow/serialization/serialized_objects.py | 1 +
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 57 ++++++--
.../airflow/ui/openapi-gen/requests/types.gen.ts | 15 +-
.../ui/src/components/Clear/Run/ClearRunDialog.tsx | 7 +-
.../TaskInstance/ClearGroupTaskInstanceDialog.tsx | 42 +++---
.../Clear/TaskInstance/ClearTaskInstanceDialog.tsx | 39 +++---
.../Clear/useRerunWithLatestVersion.test.tsx | 154 +++++++++++++++++++++
.../components/Clear/useRerunWithLatestVersion.ts | 55 ++++++++
.../src/components/DagActions/RunBackfillForm.tsx | 20 ++-
.../airflow/ui/src/queries/useCreateBackfill.ts | 1 +
.../core_api/routes/public/test_dag_run.py | 81 +++++++++++
.../core_api/routes/public/test_dags.py | 2 +
.../api_fastapi/core_api/routes/ui/test_config.py | 1 +
.../unit/cli/commands/test_backfill_command.py | 3 +
.../unit/serialization/test_dag_serialization.py | 23 +++
.../src/airflowctl/api/datamodels/generated.py | 17 ++-
task-sdk/src/airflow/sdk/definitions/dag.py | 7 +
36 files changed, 727 insertions(+), 98 deletions(-)
diff --git a/airflow-core/docs/administration-and-deployment/dag-bundles.rst
b/airflow-core/docs/administration-and-deployment/dag-bundles.rst
index 9dd9b968120..74cf2ed298d 100644
--- a/airflow-core/docs/administration-and-deployment/dag-bundles.rst
+++ b/airflow-core/docs/administration-and-deployment/dag-bundles.rst
@@ -139,6 +139,111 @@ are configured so that impersonated users can access
bundle files created by the
the need for shared group permissions.
+Configuring Default Rerun Version Behavior
+------------------------------------------
+
+When a user clears a DAG run or task instance, the UI shows a checkbox asking
whether to rerun
+with the latest bundle version or with the version the original run used. The
+``rerun_with_latest_version`` setting controls the default state of that
checkbox, so teams don't
+have to make that decision manually every time. The same setting also governs
the default
+``run_on_latest_version`` behavior when creating backfills via the API or CLI.
+
+.. note::
+
+ This only applies to versioned bundle types (like ``GitDagBundle``). Local
bundles
+ (``LocalDagBundle``) do not support versioning and will always use the
latest code.
+
+How It Works
+~~~~~~~~~~~~
+
+Each DAG has a **parsed version** (``DagModel.bundle_version``), updated every
time the dag
+processor re-parses the DAG file. Each DAG run records the bundle version it
was created with.
+
+When ``rerun_with_latest_version`` is **False**, clearing a DAG run preserves
its
+original bundle version, so the rerun uses the same code. This provides
reproducibility when
+debugging failures. When **True**, clearing updates the DAG run to the current
parsed version,
+ensuring the most recent code is used on rerun.
+
+The setting is resolved using the following precedence (highest to lowest):
+
+1. **Explicit request**: The ``run_on_latest_version`` parameter in the API
request body (if provided)
+2. **DAG-level**: The DAG's ``rerun_with_latest_version`` parameter (if
``True`` or ``False``)
+3. **Global config**: The ``[core] rerun_with_latest_version`` option (if set)
+4. **Per-call-site fallback**: ``False`` for clear/rerun, ``True`` for
backfills (preserving
+ the historical default for each path)
+
+Global Configuration
+~~~~~~~~~~~~~~~~~~~~
+
+Set organization-wide defaults using the ``[core] rerun_with_latest_version``
option:
+
+.. code-block:: ini
+
+ [core]
+ rerun_with_latest_version = False # Rerun with the original bundle version
+ # rerun_with_latest_version = True # Rerun with the latest bundle version
+
+When unset, the call site applies the historical fallback (``False`` for
clear/rerun,
+``True`` for backfills).
+
+DAG-Level Configuration
+~~~~~~~~~~~~~~~~~~~~~~~
+
+Override the global default for specific DAGs:
+
+.. code-block:: python
+
+ from datetime import datetime
+
+ from airflow import DAG
+ from airflow.operators.empty import EmptyOperator
+
+ # Always rerun with the latest version
+ with DAG(
+ dag_id="always_latest_dag",
+ rerun_with_latest_version=True,
+ start_date=datetime(2024, 1, 1),
+ ) as dag:
+ EmptyOperator(task_id="task")
+
+Use Cases
+~~~~~~~~~
+
+**Debugging failed runs**:
+ With ``False`` (the default), clearing a failed run reruns it with the
same code, making it
+ easier to reproduce and isolate issues.
+
+**Always run latest code**:
+ Set ``[core] rerun_with_latest_version = True`` if your team prefers
reruns to always pick up the
+ latest code, for example when bug fixes have been deployed since the
original run.
+
+**Mixed policy**:
+ Set the global default to ``True`` but override specific critical DAGs with
+ ``rerun_with_latest_version=False`` for version stability where it matters
most.
+
+Relationship with ``disable_bundle_versioning``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Airflow provides two separate settings that affect bundle versioning behavior.
+They serve different purposes:
+
+``disable_bundle_versioning``
+ Turns off version tracking entirely. When set to ``True``, no
``bundle_version`` is
+ recorded on DAG runs. Available as a DAG parameter and as a global config
option
+ (``[dag_processor] disable_bundle_versioning``).
+
+``rerun_with_latest_version``
+ Controls the default *rerun behavior* while keeping version tracking
active. When a user
+ clears or reruns a task, this determines whether the new run uses the
latest bundle
+ version or the original version. Versioning remains enabled so the version
history is
+ still recorded. This only changes the default choice presented to users.
+
+In short: ``disable_bundle_versioning`` answers "should we track versions at
all?", while
+``rerun_with_latest_version`` answers "when rerunning, which version should be
the default?".
+The two settings are independent. ``rerun_with_latest_version`` has no effect
when versioning
+is disabled.
+
+
Writing custom Dag bundles
--------------------------
diff --git a/airflow-core/newsfragments/63884.significant.rst
b/airflow-core/newsfragments/63884.significant.rst
new file mode 100644
index 00000000000..2695006b183
--- /dev/null
+++ b/airflow-core/newsfragments/63884.significant.rst
@@ -0,0 +1,34 @@
+Add ``rerun_with_latest_version`` configuration for DAG bundle versioning
+
+When clearing, rerunning, or backfilling tasks, this setting controls whether
the
+new DAG run uses the latest bundle version or the original version from the
+initial run. It applies to clear/rerun actions (UI and API) and to backfill
+creation (API and CLI). The default is resolved using the following precedence:
+
+1. **Explicit request**: ``run_on_latest_version`` parameter in the API
request body
+ or the ``--run-on-latest-version`` / ``--no-run-on-latest-version`` CLI
flag.
+2. **DAG-level**: ``rerun_with_latest_version`` parameter on the DAG
definition.
+3. **Global config**: ``[core] rerun_with_latest_version`` in ``airflow.cfg``.
+4. **Default**: ``False`` for clear/rerun, ``True`` for backfills (preserves
+ historical behavior).
+
+In Airflow 2.x, reruns always used the latest code. Airflow 3.x introduced
bundle
+versioning, defaulting to the original version. This setting gives users
control
+over which behaviour is the default.
+
+See :doc:`/administration-and-deployment/dag-bundles` for full details.
+
+* Types of change
+
+ * [ ] Dag changes
+ * [x] Config changes
+ * [x] API changes
+ * [ ] CLI changes
+ * [x] Behaviour changes
+ * [ ] Plugin changes
+ * [ ] Dependency changes
+ * [ ] Code interface changes
+
+* Migration rules needed
+
+ * None - this is a new optional feature with backwards-compatible defaults
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/backfills.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/backfills.py
index 64ed8bb05e9..52538b37e16 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/backfills.py
@@ -36,7 +36,13 @@ class BackfillPostBody(StrictBaseModel):
dag_run_conf: dict | None = None
reprocess_behavior: ReprocessBehavior = ReprocessBehavior.NONE
max_active_runs: int = 10
- run_on_latest_version: bool = True
+ run_on_latest_version: bool | None = Field(
+ default=None,
+ description="Run on the latest bundle version of the Dag for each
backfilled run. "
+ "If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, "
+ "then the ``[core] rerun_with_latest_version`` config option, "
+ "and finally ``True`` (the historical default for backfills).",
+ )
class BackfillResponse(BaseModel):
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index 481cc0387fe..b1e2500203e 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -59,9 +59,12 @@ class DAGRunClearBody(StrictBaseModel):
default=False,
description="Only queue newly added tasks in the latest Dag version
without clearing existing tasks.",
)
- run_on_latest_version: bool = Field(
- default=False,
- description="(Experimental) Run on the latest bundle version of the
Dag after clearing the Dag Run.",
+ run_on_latest_version: bool | None = Field(
+ default=None,
+ description="(Experimental) Run on the latest bundle version of the
Dag after clearing the Dag Run. "
+ "If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, "
+ "then the ``[core] rerun_with_latest_version`` config option, "
+ "and finally ``False`` (the historical default for clear/rerun).",
)
@model_validator(mode="before")
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
index 27853cfecb4..dea97c46352 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dags.py
@@ -202,6 +202,7 @@ class DAGDetailsResponse(DAGResponse):
timezone: str | None
last_parsed: datetime | None
default_args: Mapping | None
+ rerun_with_latest_version: bool | None = None
owner_links: dict[str, str] | None = None
is_favorite: bool = False
active_runs_count: int = 0
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index bcb97274570..85e14bc3baf 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -216,10 +216,13 @@ class ClearTaskInstancesBody(StrictBaseModel):
include_downstream: bool = False
include_future: bool = False
include_past: bool = False
- run_on_latest_version: bool = Field(
- default=False,
+ run_on_latest_version: bool | None = Field(
+ default=None,
description="(Experimental) Run on the latest bundle version of the
dag after "
- "clearing the task instances.",
+ "clearing the task instances. "
+ "If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, "
+ "then the ``[core] rerun_with_latest_version`` config option, "
+ "and finally ``False`` (the historical default for clear/rerun).",
)
prevent_running_task: bool = False
note: Annotated[str, StringConstraints(max_length=1000)] | None = None
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/config.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/config.py
index a511b31142b..acf16006474 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/config.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/config.py
@@ -40,6 +40,7 @@ class ConfigResponse(BaseModel):
external_log_name: str | None = None
theme: Theme | None
multi_team: bool
+ rerun_with_latest_version: bool | None = None
@field_serializer("theme")
def serialize_theme(self, theme: Theme | None) -> dict | None:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
index 52b8100a944..080f9c4ded3 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
+++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml
@@ -1974,6 +1974,11 @@ components:
multi_team:
type: boolean
title: Multi Team
+ rerun_with_latest_version:
+ anyOf:
+ - type: boolean
+ - type: 'null'
+ title: Rerun With Latest Version
type: object
required:
- fallback_page_limit
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index a4434151f3d..00a9c60d858 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -11423,9 +11423,14 @@ components:
title: Max Active Runs
default: 10
run_on_latest_version:
- type: boolean
+ anyOf:
+ - type: boolean
+ - type: 'null'
title: Run On Latest Version
- default: true
+ description: Run on the latest bundle version of the Dag for each
backfilled
+ run. If not specified, falls back to the DAG-level
``rerun_with_latest_version``
+ parameter, then the ``[core] rerun_with_latest_version`` config
option,
+ and finally ``True`` (the historical default for backfills).
additionalProperties: false
type: object
required:
@@ -12100,11 +12105,14 @@ components:
title: Include Past
default: false
run_on_latest_version:
- type: boolean
+ anyOf:
+ - type: boolean
+ - type: 'null'
title: Run On Latest Version
description: (Experimental) Run on the latest bundle version of the
dag
- after clearing the task instances.
- default: false
+ after clearing the task instances. If not specified, falls back to
the
+ DAG-level ``rerun_with_latest_version`` parameter, then the
``[core] rerun_with_latest_version``
+ config option, and finally ``False`` (the historical default for
clear/rerun).
prevent_running_task:
type: boolean
title: Prevent Running Task
@@ -12555,6 +12563,11 @@ components:
type: object
- type: 'null'
title: Default Args
+ rerun_with_latest_version:
+ anyOf:
+ - type: boolean
+ - type: 'null'
+ title: Rerun With Latest Version
owner_links:
anyOf:
- additionalProperties:
@@ -12844,11 +12857,14 @@ components:
clearing existing tasks.
default: false
run_on_latest_version:
- type: boolean
+ anyOf:
+ - type: boolean
+ - type: 'null'
title: Run On Latest Version
description: (Experimental) Run on the latest bundle version of the
Dag
- after clearing the Dag Run.
- default: false
+ after clearing the Dag Run. If not specified, falls back to the
DAG-level
+ ``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version``
+ config option, and finally ``False`` (the historical default for
clear/rerun).
additionalProperties: false
type: object
title: DAGRunClearBody
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
index c8ba0b7be96..4e4bc90dceb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py
@@ -42,6 +42,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import (
create_openapi_http_exception_doc,
)
from airflow.api_fastapi.core_api.security import GetUserDep,
requires_access_backfill
+from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import DagNotFound, DagRunTypeNotAllowed
from airflow.models import DagRun
@@ -228,9 +229,16 @@ def cancel_backfill(backfill_id: NonNegativeInt, session:
SessionDep) -> Backfil
def create_backfill(
backfill_request: BackfillPostBody,
user: GetUserDep,
+ session: SessionDep,
) -> BackfillResponse:
from_date = timezone.coerce_datetime(backfill_request.from_date)
to_date = timezone.coerce_datetime(backfill_request.to_date)
+ resolved_run_on_latest = resolve_run_on_latest_version(
+ backfill_request.run_on_latest_version,
+ backfill_request.dag_id,
+ session,
+ fallback=True,
+ )
try:
backfill_obj = _create_backfill(
dag_id=backfill_request.dag_id,
@@ -241,7 +249,7 @@ def create_backfill(
dag_run_conf=backfill_request.dag_run_conf,
triggering_user_name=user.get_name(),
reprocess_behavior=backfill_request.reprocess_behavior,
- run_on_latest_version=backfill_request.run_on_latest_version,
+ run_on_latest_version=resolved_run_on_latest,
)
return BackfillResponse.model_validate(backfill_obj)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index 0e3670409b4..4577a516089 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -97,6 +97,7 @@ from airflow.api_fastapi.core_api.security import (
requires_access_asset,
requires_access_dag,
)
+from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
from airflow.api_fastapi.core_api.services.public.dag_run import DagRunWaiter
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.exceptions import ParamValidationError
@@ -317,6 +318,8 @@ def clear_dag_run(
if not dag:
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
+ resolved_run_on_latest =
resolve_run_on_latest_version(body.run_on_latest_version, dag_id, session)
+
if body.dry_run:
if body.only_new:
# Determine "new" tasks by TI existence: a task is new when the
latest Dag
@@ -363,7 +366,7 @@ def clear_dag_run(
task_ids=None,
only_new=body.only_new,
only_failed=body.only_failed,
- run_on_latest_version=body.run_on_latest_version,
+ run_on_latest_version=resolved_run_on_latest,
session=session,
)
dag_run_cleared = session.scalar(select(DagRun).where(DagRun.id ==
dag_run.id))
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
index 083e4fa3057..e64435ce9f6 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
+++
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py
@@ -96,6 +96,7 @@ from airflow.api_fastapi.core_api.datamodels.task_instances
import (
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import GetUserDep,
ReadableTIFilterDep, requires_access_dag
+from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
from airflow.api_fastapi.core_api.services.public.task_instances import (
BulkTaskInstanceService,
_get_task_group_task_instances,
@@ -834,6 +835,8 @@ def post_clear_task_instances(
"""Clear task instances."""
dag = get_latest_version_of_dag(dag_bag, dag_id, session)
+ resolved_run_on_latest =
resolve_run_on_latest_version(body.run_on_latest_version, dag_id, session)
+
reset_dag_runs = body.reset_dag_runs
dry_run = body.dry_run
# We always pass dry_run here, otherwise this would try to confirm on the
terminal!
@@ -918,7 +921,7 @@ def post_clear_task_instances(
task_ids=task_markers_to_clear,
run_id=dag_run_id,
session=session,
- run_on_latest_version=body.run_on_latest_version,
+ run_on_latest_version=resolved_run_on_latest,
only_failed=body.only_failed,
only_running=body.only_running,
)
@@ -930,7 +933,7 @@ def post_clear_task_instances(
start_date=body.start_date,
end_date=body.end_date,
session=session,
- run_on_latest_version=body.run_on_latest_version,
+ run_on_latest_version=resolved_run_on_latest,
only_failed=body.only_failed,
only_running=body.only_running,
)
@@ -941,7 +944,7 @@ def post_clear_task_instances(
task_instances,
session,
DagRunState.QUEUED if reset_dag_runs else False,
- run_on_latest_version=body.run_on_latest_version,
+ run_on_latest_version=resolved_run_on_latest,
prevent_running_task=body.prevent_running_task,
)
except AirflowClearRunningTaskException as e:
diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py
index 95109382320..7a93583875f 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/config.py
@@ -62,6 +62,13 @@ def get_configs() -> ConfigResponse:
"external_log_name": getattr(task_log_reader.log_handler, "log_name",
None),
"theme": loads(conf.get("api", "theme", fallback="{}")) or None,
"multi_team": conf.getboolean("core", "multi_team"),
+ # Return None when the option isn't explicitly set so the UI hook can
apply
+ # its own fallback (False for clear/rerun, True for backfills).
+ "rerun_with_latest_version": (
+ conf.getboolean("core", "rerun_with_latest_version")
+ if conf.has_option("core", "rerun_with_latest_version")
+ else None
+ ),
}
config.update({key: value for key, value in additional_config.items()})
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
index 37d3e5e08df..e016cb908c4 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/common.py
@@ -35,6 +35,8 @@ from airflow.api_fastapi.core_api.datamodels.common import (
BulkUpdateAction,
T,
)
+from airflow.configuration import conf
+from airflow.models.serialized_dag import SerializedDagModel
class BulkService(Generic[T], ABC):
@@ -120,3 +122,27 @@ class BulkService(Generic[T], ABC):
setattr(model, key, value)
return model
+
+
+def resolve_run_on_latest_version(
+ explicit_value: bool | None,
+ dag_id: str,
+ session: Session,
+ fallback: bool = False,
+) -> bool:
+ """
+ Resolve run_on_latest_version using precedence: explicit > DAG-level >
global config > fallback.
+
+ :param explicit_value: Value from the API request body (or None if not
specified).
+ :param dag_id: The DAG ID to look up.
+ :param session: Database session.
+ :param fallback: Default to use when neither DAG-level nor global config
is set.
+ Clear/rerun endpoints use False (the historical default).
+ Backfill endpoint uses True (the historical default for backfills).
+ """
+ if explicit_value is not None:
+ return explicit_value
+ serialized = SerializedDagModel.get_dag(dag_id, session=session)
+ if serialized and serialized.rerun_with_latest_version is not None:
+ return serialized.rerun_with_latest_version
+ return conf.getboolean("core", "rerun_with_latest_version",
fallback=fallback)
diff --git a/airflow-core/src/airflow/cli/cli_config.py
b/airflow-core/src/airflow/cli/cli_config.py
index 81b9dcf0600..df0c2c15fbe 100644
--- a/airflow-core/src/airflow/cli/cli_config.py
+++ b/airflow-core/src/airflow/cli/cli_config.py
@@ -382,10 +382,12 @@ ARG_BACKFILL_RUN_ON_LATEST_VERSION = Arg(
("--run-on-latest-version",),
help=(
"(Experimental) The backfill will run tasks using the latest bundle
version instead of "
- "the version that was active when the original Dag run was created.
Defaults to True."
+ "the version that was active when the original Dag run was created. "
+ "If not specified, uses the DAG-level or global configuration default "
+ "(falling back to True for backfills to preserve historical behavior)."
),
- action="store_true",
- default=True,
+ action=argparse.BooleanOptionalAction,
+ default=None,
)
diff --git a/airflow-core/src/airflow/cli/commands/backfill_command.py
b/airflow-core/src/airflow/cli/commands/backfill_command.py
index 4b252d2b4b4..5ce8cc92242 100644
--- a/airflow-core/src/airflow/cli/commands/backfill_command.py
+++ b/airflow-core/src/airflow/cli/commands/backfill_command.py
@@ -24,6 +24,7 @@ import signal
from tabulate import tabulate
from airflow import settings
+from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
from airflow.cli.simple_table import AirflowConsole
from airflow.exceptions import AirflowConfigException
from airflow.models.backfill import ReprocessBehavior, _create_backfill,
_do_dry_run
@@ -49,6 +50,14 @@ def create_backfill(args) -> None:
else:
reprocess_behavior = None
+ with create_session() as session:
+ resolved_run_on_latest = resolve_run_on_latest_version(
+ args.run_on_latest_version,
+ args.dag_id,
+ session,
+ fallback=True,
+ )
+
if args.dry_run:
console.print("Performing dry run of backfill.")
console.print("Printing params:")
@@ -60,7 +69,7 @@ def create_backfill(args) -> None:
reverse=args.run_backwards,
dag_run_conf=args.dag_run_conf,
reprocess_behavior=reprocess_behavior,
- run_on_latest_version=args.run_on_latest_version,
+ run_on_latest_version=resolved_run_on_latest,
)
for k, v in params.items():
console.print(f" - {k} = {v}")
@@ -105,5 +114,5 @@ def create_backfill(args) -> None:
dag_run_conf=dag_run_conf,
triggering_user_name=user,
reprocess_behavior=reprocess_behavior,
- run_on_latest_version=args.run_on_latest_version,
+ run_on_latest_version=resolved_run_on_latest,
)
diff --git a/airflow-core/src/airflow/config_templates/config.yml
b/airflow-core/src/airflow/config_templates/config.yml
index 0dfbbbda6c4..f7f958bb84b 100644
--- a/airflow-core/src/airflow/config_templates/config.yml
+++ b/airflow-core/src/airflow/config_templates/config.yml
@@ -520,6 +520,19 @@ core:
type: boolean
example: ~
default: "False"
+ rerun_with_latest_version:
+ description: |
+ Default value for whether cleared, rerun, or backfilled tasks should
use
+ the latest bundle version. When set to True, reruns and backfills pick
up
+ the latest code. When set to False, they use the original bundle
version.
+ When unset, the fallback depends on the call site: False for clearing
or
+ rerunning tasks, True for creating backfills (preserving the historical
+ defaults for each). Individual DAGs can override this with the
+ ``rerun_with_latest_version`` parameter.
+ type: boolean
+ example: ~
+ default: ~
+ version_added: 3.2.0
database:
description: ~
options:
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index dad34ddd0e6..d20b9c22c03 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -103,6 +103,7 @@ class SerializedDAG:
allowed_run_types: list[str] | None = None
description: str | None = None
disable_bundle_versioning: bool = False
+ rerun_with_latest_version: bool | None = None
doc_md: str | None = None
edge_info: dict[str, dict[str, EdgeInfoType]] = attrs.field(factory=dict)
end_date: datetime.datetime | None = None
@@ -152,6 +153,7 @@ class SerializedDAG:
"allowed_run_types",
"description",
"disable_bundle_versioning",
+ "rerun_with_latest_version",
"doc_md",
"edge_info",
"end_date",
diff --git a/airflow-core/src/airflow/serialization/schema.json
b/airflow-core/src/airflow/serialization/schema.json
index e2eed749c94..2ec82785631 100644
--- a/airflow-core/src/airflow/serialization/schema.json
+++ b/airflow-core/src/airflow/serialization/schema.json
@@ -224,7 +224,8 @@
]},
"edge_info": { "$ref": "#/definitions/edge_info" },
"dag_dependencies": { "$ref": "#/definitions/dag_dependencies" },
- "disable_bundle_versioning": {"type": "boolean" }
+ "disable_bundle_versioning": {"type": "boolean" },
+ "rerun_with_latest_version": {"type": ["boolean", "null"], "default":
null}
},
"required": [
"dag_id",
diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py
b/airflow-core/src/airflow/serialization/serialized_objects.py
index cf0e2dc149f..0d9631089bf 100644
--- a/airflow-core/src/airflow/serialization/serialized_objects.py
+++ b/airflow-core/src/airflow/serialization/serialized_objects.py
@@ -2230,6 +2230,7 @@ class LazyDeserializedDAG(pydantic.BaseModel):
"jinja_environment_kwargs",
"relative_fileloc",
"disable_bundle_versioning",
+ "rerun_with_latest_version",
"fail_fast",
"last_loaded",
}
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 9fc0f4c7fb5..108480b3dd1 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -523,9 +523,16 @@ export const $BackfillPostBody = {
default: 10
},
run_on_latest_version: {
- type: 'boolean',
+ anyOf: [
+ {
+ type: 'boolean'
+ },
+ {
+ type: 'null'
+ }
+ ],
title: 'Run On Latest Version',
- default: true
+ description: 'Run on the latest bundle version of the Dag for each
backfilled run. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``True`` (the historical
default for backfills).'
}
},
additionalProperties: false,
@@ -1464,10 +1471,16 @@ export const $ClearTaskInstancesBody = {
default: false
},
run_on_latest_version: {
- type: 'boolean',
+ anyOf: [
+ {
+ type: 'boolean'
+ },
+ {
+ type: 'null'
+ }
+ ],
title: 'Run On Latest Version',
- description: '(Experimental) Run on the latest bundle version of
the dag after clearing the task instances.',
- default: false
+ description: '(Experimental) Run on the latest bundle version of
the dag after clearing the task instances. If not specified, falls back to the
DAG-level ``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False`` (the
historical default for clear/rerun).'
},
prevent_running_task: {
type: 'boolean',
@@ -2240,6 +2253,17 @@ export const $DAGDetailsResponse = {
],
title: 'Default Args'
},
+ rerun_with_latest_version: {
+ anyOf: [
+ {
+ type: 'boolean'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Rerun With Latest Version'
+ },
owner_links: {
anyOf: [
{
@@ -2590,10 +2614,16 @@ export const $DAGRunClearBody = {
default: false
},
run_on_latest_version: {
- type: 'boolean',
+ anyOf: [
+ {
+ type: 'boolean'
+ },
+ {
+ type: 'null'
+ }
+ ],
title: 'Run On Latest Version',
- description: '(Experimental) Run on the latest bundle version of
the Dag after clearing the Dag Run.',
- default: false
+ description: '(Experimental) Run on the latest bundle version of
the Dag after clearing the Dag Run. If not specified, falls back to the
DAG-level ``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False`` (the
historical default for clear/rerun).'
}
},
additionalProperties: false,
@@ -7701,6 +7731,17 @@ export const $ConfigResponse = {
multi_team: {
type: 'boolean',
title: 'Multi Team'
+ },
+ rerun_with_latest_version: {
+ anyOf: [
+ {
+ type: 'boolean'
+ },
+ {
+ type: 'null'
+ }
+ ],
+ title: 'Rerun With Latest Version'
}
},
type: 'object',
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index ee025fc11a5..1ea5961b919 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -150,7 +150,10 @@ export type BackfillPostBody = {
} | null;
reprocess_behavior?: ReprocessBehavior;
max_active_runs?: number;
- run_on_latest_version?: boolean;
+ /**
+ * Run on the latest bundle version of the Dag for each backfilled run. If
not specified, falls back to the DAG-level ``rerun_with_latest_version``
parameter, then the ``[core] rerun_with_latest_version`` config option, and
finally ``True`` (the historical default for backfills).
+ */
+ run_on_latest_version?: boolean | null;
};
/**
@@ -455,9 +458,9 @@ export type ClearTaskInstancesBody = {
include_future?: boolean;
include_past?: boolean;
/**
- * (Experimental) Run on the latest bundle version of the dag after
clearing the task instances.
+ * (Experimental) Run on the latest bundle version of the dag after
clearing the task instances. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False`` (the
historical default for clear/rerun).
*/
- run_on_latest_version?: boolean;
+ run_on_latest_version?: boolean | null;
prevent_running_task?: boolean;
note?: string | null;
};
@@ -606,6 +609,7 @@ export type DAGDetailsResponse = {
default_args: {
[key: string]: unknown;
} | null;
+ rerun_with_latest_version?: boolean | null;
owner_links?: {
[key: string]: (string);
} | null;
@@ -692,9 +696,9 @@ export type DAGRunClearBody = {
*/
only_new?: boolean;
/**
- * (Experimental) Run on the latest bundle version of the Dag after
clearing the Dag Run.
+ * (Experimental) Run on the latest bundle version of the Dag after
clearing the Dag Run. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False`` (the
historical default for clear/rerun).
*/
- run_on_latest_version?: boolean;
+ run_on_latest_version?: boolean | null;
};
/**
@@ -1929,6 +1933,7 @@ export type ConfigResponse = {
external_log_name?: string | null;
theme: Theme | null;
multi_team: boolean;
+ rerun_with_latest_version?: boolean | null;
};
/**
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
index 74a2332bc47..05f15856c6b 100644
--- a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
+++ b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
@@ -24,6 +24,7 @@ import { CgRedo } from "react-icons/cg";
import { useDagServiceGetDagDetails } from "openapi/queries";
import type { DAGRunResponse } from "openapi/requests/types.gen";
import { ActionAccordion } from "src/components/ActionAccordion";
+import { useRerunWithLatestVersion } from
"src/components/Clear/useRerunWithLatestVersion";
import { Checkbox, Dialog } from "src/components/ui";
import SegmentedControl from "src/components/ui/SegmentedControl";
import { useClearDagRunDryRun } from "src/queries/useClearDagRunDryRun";
@@ -46,13 +47,15 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props)
=> {
const [selectedOptions, setSelectedOptions] =
useState<Array<string>>(["existingTasks"]);
const onlyFailed = selectedOptions.includes("onlyFailed");
const onlyNew = selectedOptions.includes("newTasks");
- const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
- // Get current DAG's bundle version to compare with DAG run's bundle version
const { data: dagDetails } = useDagServiceGetDagDetails({
dagId,
});
+ const { setValue: setRunOnLatestVersion, value: runOnLatestVersion } =
useRerunWithLatestVersion({
+ dagLevelConfig: dagDetails?.rerun_with_latest_version,
+ });
+
const refetchInterval = useAutoRefresh({ dagId });
const { data: affectedTasks = { task_instances: [], total_entries: 0 } } =
useClearDagRunDryRun({
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
index 270da2c7b39..ab61c46e7ae 100644
---
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
@@ -17,7 +17,7 @@
* under the License.
*/
import { Button, Flex, Heading, VStack } from "@chakra-ui/react";
-import { useEffect, useRef, useState } from "react";
+import { useState } from "react";
import { useTranslation } from "react-i18next";
import { CgRedo } from "react-icons/cg";
import { useParams } from "react-router-dom";
@@ -25,6 +25,7 @@ import { useParams } from "react-router-dom";
import { useDagServiceGetDagDetails, useTaskInstanceServiceGetTaskInstances }
from "openapi/queries";
import type { LightGridTaskInstanceSummary, TaskInstanceResponse } from
"openapi/requests/types.gen";
import { ActionAccordion } from "src/components/ActionAccordion";
+import { useRerunWithLatestVersion } from
"src/components/Clear/useRerunWithLatestVersion";
import { Checkbox, Dialog } from "src/components/ui";
import SegmentedControl from "src/components/ui/SegmentedControl";
import { useClearTaskInstances } from "src/queries/useClearTaskInstances";
@@ -57,9 +58,6 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open,
taskInstance }: Pr
const future = selectedOptions.includes("future");
const upstream = selectedOptions.includes("upstream");
const downstream = selectedOptions.includes("downstream");
- const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
- const userToggledRunOnLatestRef = useRef(false);
-
const [note, setNote] = useState<string | null>(null);
const { data: dagDetails } = useDagServiceGetDagDetails({
@@ -80,6 +78,21 @@ export const ClearGroupTaskInstanceDialog = ({ onClose,
open, taskInstance }: Pr
const groupTaskIds = groupTaskInstances?.task_instances.map((ti) =>
ti.task_id) ?? [];
+ const { dagVersionsDiffer, shouldShowRunOnLatestOption } =
getRunOnLatestVersionState({
+ latestBundleVersion: dagDetails?.bundle_version,
+ latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
+ selectedDagVersionNumber: taskInstance.dag_version_number,
+ // Fall back to legacy heuristic when grid summary has no version (older
API).
+ useLatestBundleVersionAsFallback: true,
+ });
+
+ // dagVersionsDiffer becomes the fallback so the historical "auto-check when
versions
+ // differ" heuristic still applies when neither DAG-level nor global config
is set.
+ const { setValue: setRunOnLatestVersion, value: runOnLatestVersion } =
useRerunWithLatestVersion({
+ dagLevelConfig: dagDetails?.rerun_with_latest_version,
+ fallback: dagVersionsDiffer,
+ });
+
const refetchInterval = useAutoRefresh({ dagId });
const { data } = useClearTaskInstancesDryRun({
@@ -109,22 +122,6 @@ export const ClearGroupTaskInstanceDialog = ({ onClose,
open, taskInstance }: Pr
total_entries: 0,
};
- const { dagVersionsDiffer, shouldShowRunOnLatestOption } =
getRunOnLatestVersionState({
- latestBundleVersion: dagDetails?.bundle_version,
- latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
- selectedDagVersionNumber: taskInstance.dag_version_number,
- // Fall back to legacy heuristic when grid summary has no version (older
API).
- useLatestBundleVersionAsFallback: true,
- });
-
- useEffect(() => {
- if (!open) {
- userToggledRunOnLatestRef.current = false;
- } else if (!userToggledRunOnLatestRef.current) {
- setRunOnLatestVersion(dagVersionsDiffer);
- }
- }, [open, dagVersionsDiffer]);
-
return (
<Dialog.Root lazyMount onOpenChange={onClose} open={open}>
<Dialog.Content backdrop>
@@ -184,10 +181,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose,
open, taskInstance }: Pr
{shouldShowRunOnLatestOption ? (
<Checkbox
checked={runOnLatestVersion}
- onCheckedChange={(event) => {
- userToggledRunOnLatestRef.current = true;
- setRunOnLatestVersion(Boolean(event.checked));
- }}
+ onCheckedChange={(event) =>
setRunOnLatestVersion(Boolean(event.checked))}
>
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
index 43b76846789..b281fef7a07 100644
---
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
@@ -17,13 +17,14 @@
* under the License.
*/
import { Button, Flex, Heading, useDisclosure, VStack } from
"@chakra-ui/react";
-import { useEffect, useRef, useState } from "react";
+import { useState } from "react";
import { useTranslation } from "react-i18next";
import { CgRedo } from "react-icons/cg";
import { useDagServiceGetDagDetails } from "openapi/queries";
import type { TaskInstanceResponse } from "openapi/requests/types.gen";
import { ActionAccordion } from "src/components/ActionAccordion";
+import { useRerunWithLatestVersion } from
"src/components/Clear/useRerunWithLatestVersion";
import Time from "src/components/Time";
import { Checkbox, Dialog } from "src/components/ui";
import SegmentedControl from "src/components/ui/SegmentedControl";
@@ -89,8 +90,6 @@ const ClearTaskInstanceDialog = (props: Props) => {
const future = selectedOptions.includes("future");
const upstream = selectedOptions.includes("upstream");
const downstream = selectedOptions.includes("downstream");
- const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
- const userToggledRunOnLatestRef = useRef(false);
const [preventRunningTask, setPreventRunningTask] = useState(true);
const [note, setNote] = useState<string | null>(taskInstance?.note ?? null);
@@ -105,6 +104,20 @@ const ClearTaskInstanceDialog = (props: Props) => {
dagId,
});
+ const { dagVersionsDiffer, shouldShowRunOnLatestOption } =
getRunOnLatestVersionState({
+ latestBundleVersion: dagDetails?.bundle_version,
+ latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
+ selectedBundleVersion: taskInstance?.dag_version?.bundle_version,
+ selectedDagVersionNumber: taskInstance?.dag_version?.version_number,
+ });
+
+ // dagVersionsDiffer becomes the fallback so the historical "auto-check when
versions
+ // differ" heuristic still applies when neither DAG-level nor global config
is set.
+ const { setValue: setRunOnLatestVersion, value: runOnLatestVersion } =
useRerunWithLatestVersion({
+ dagLevelConfig: dagDetails?.rerun_with_latest_version,
+ fallback: dagVersionsDiffer,
+ });
+
const refetchInterval = useAutoRefresh({ dagId });
const { data } = useClearTaskInstancesDryRun({
@@ -134,21 +147,6 @@ const ClearTaskInstanceDialog = (props: Props) => {
total_entries: 0,
};
- const { dagVersionsDiffer, shouldShowRunOnLatestOption } =
getRunOnLatestVersionState({
- latestBundleVersion: dagDetails?.bundle_version,
- latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
- selectedBundleVersion: taskInstance?.dag_version?.bundle_version,
- selectedDagVersionNumber: taskInstance?.dag_version?.version_number,
- });
-
- useEffect(() => {
- if (!openDialog) {
- userToggledRunOnLatestRef.current = false;
- } else if (!userToggledRunOnLatestRef.current) {
- setRunOnLatestVersion(dagVersionsDiffer);
- }
- }, [openDialog, dagVersionsDiffer]);
-
return (
<>
<Dialog.Root lazyMount onOpenChange={onCloseDialog} open={openDialog ?
!open : false}>
@@ -219,10 +217,7 @@ const ClearTaskInstanceDialog = (props: Props) => {
{shouldShowRunOnLatestOption ? (
<Checkbox
checked={runOnLatestVersion}
- onCheckedChange={(event) => {
- userToggledRunOnLatestRef.current = true;
- setRunOnLatestVersion(Boolean(event.checked));
- }}
+ onCheckedChange={(event) =>
setRunOnLatestVersion(Boolean(event.checked))}
>
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/useRerunWithLatestVersion.test.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/useRerunWithLatestVersion.test.tsx
new file mode 100644
index 00000000000..9a99ee5bd58
--- /dev/null
+++
b/airflow-core/src/airflow/ui/src/components/Clear/useRerunWithLatestVersion.test.tsx
@@ -0,0 +1,154 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { act, renderHook } from "@testing-library/react";
+import { describe, expect, it, vi } from "vitest";
+
+import { useConfig } from "src/queries/useConfig";
+
+import { useRerunWithLatestVersion } from "./useRerunWithLatestVersion";
+
+// Mock useConfig to control the global config value
+vi.mock("src/queries/useConfig", () => ({
+ useConfig: vi.fn(),
+}));
+
+const mockUseConfig = vi.mocked(useConfig);
+
+describe("useRerunWithLatestVersion — precedence", () => {
+ it("falls back to default when config is not yet resolved", () => {
+ mockUseConfig.mockReturnValue(undefined);
+
+ const { result } = renderHook(() => useRerunWithLatestVersion({
dagLevelConfig: undefined }));
+
+ expect(result.current.value).toBe(false);
+ });
+
+ it("DAG-level true takes precedence over global false", () => {
+ mockUseConfig.mockReturnValue(false);
+
+ const { result } = renderHook(() => useRerunWithLatestVersion({
dagLevelConfig: true }));
+
+ expect(result.current.value).toBe(true);
+ });
+
+ it("DAG-level false takes precedence over global true", () => {
+ mockUseConfig.mockReturnValue(true);
+
+ const { result } = renderHook(() => useRerunWithLatestVersion({
dagLevelConfig: false }));
+
+ expect(result.current.value).toBe(false);
+ });
+
+ it("falls back to global config when DAG-level is null", () => {
+ mockUseConfig.mockReturnValue(true);
+
+ const { result } = renderHook(() => useRerunWithLatestVersion({
dagLevelConfig: null }));
+
+ expect(result.current.value).toBe(true);
+ });
+
+ it("falls back to false when DAG-level is null and global is false", () => {
+ mockUseConfig.mockReturnValue(false);
+
+ const { result } = renderHook(() => useRerunWithLatestVersion({
dagLevelConfig: null }));
+
+ expect(result.current.value).toBe(false);
+ });
+
+ it("defaults to false when no config is set", () => {
+ mockUseConfig.mockReturnValue(undefined);
+
+ const { result } = renderHook(() => useRerunWithLatestVersion({
dagLevelConfig: null }));
+
+ expect(result.current.value).toBe(false);
+ });
+
+ it("applies global config when DAG details are still loading", () => {
+ // Global config loads first, DAG details still undefined
+ mockUseConfig.mockReturnValue(true);
+
+ const { rerender, result } = renderHook(
+ ({ dagLevelConfig }: { dagLevelConfig?: boolean | null }) =>
+ useRerunWithLatestVersion({ dagLevelConfig }),
+ { initialProps: { dagLevelConfig: undefined as boolean | null |
undefined } },
+ );
+
+ // Global config is true, DAG details not yet loaded -> should apply
global default
+ expect(result.current.value).toBe(true);
+
+ // DAG details load with explicit false -> should override to false
+ rerender({ dagLevelConfig: false });
+
+ expect(result.current.value).toBe(false);
+ });
+
+ it("uses fallback=true for backfills when no DAG or global config", () => {
+ mockUseConfig.mockReturnValue(undefined);
+
+ const { result } = renderHook(() => useRerunWithLatestVersion({
dagLevelConfig: null, fallback: true }));
+
+ expect(result.current.value).toBe(true);
+ });
+
+ it("DAG-level false overrides fallback=true", () => {
+ mockUseConfig.mockReturnValue(undefined);
+
+ const { result } = renderHook(() => useRerunWithLatestVersion({
dagLevelConfig: false, fallback: true }));
+
+ expect(result.current.value).toBe(false);
+ });
+});
+
+describe("useRerunWithLatestVersion — user toggle", () => {
+ it("user toggle overrides the resolved default", () => {
+ mockUseConfig.mockReturnValue(true);
+
+ const { result } = renderHook(() => useRerunWithLatestVersion({
dagLevelConfig: true }));
+
+ expect(result.current.value).toBe(true);
+
+ act(() => {
+ result.current.setValue(false);
+ });
+
+ expect(result.current.value).toBe(false);
+ });
+
+ it("user toggle is preserved when config changes", () => {
+ mockUseConfig.mockReturnValue(false);
+
+ const { rerender, result } = renderHook(
+ ({ dagLevelConfig }: { dagLevelConfig?: boolean | null }) =>
+ useRerunWithLatestVersion({ dagLevelConfig }),
+ { initialProps: { dagLevelConfig: false as boolean | null } },
+ );
+
+ // User toggles to true
+ act(() => {
+ result.current.setValue(true);
+ });
+
+ expect(result.current.value).toBe(true);
+
+ // Config changes, but user toggle should be preserved
+ rerender({ dagLevelConfig: false });
+
+ expect(result.current.value).toBe(true);
+ });
+});
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/useRerunWithLatestVersion.ts
b/airflow-core/src/airflow/ui/src/components/Clear/useRerunWithLatestVersion.ts
new file mode 100644
index 00000000000..c0f747b09c2
--- /dev/null
+++
b/airflow-core/src/airflow/ui/src/components/Clear/useRerunWithLatestVersion.ts
@@ -0,0 +1,55 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useState } from "react";
+
+import { useConfig } from "src/queries/useConfig";
+
+type UseRerunWithLatestVersionProps = {
+ /** DAG-level rerun_with_latest_version from DAG details. */
+ dagLevelConfig?: boolean | null;
+ /**
+ * Default to use when neither DAG-level nor global config is set.
+ * Use `false` for clear/rerun (the historical default) and `true` for
backfills.
+ */
+ fallback?: boolean;
+};
+
+type UseRerunWithLatestVersionResult = {
+ setValue: (newValue: boolean) => void;
+ value: boolean;
+};
+
+/**
+ * Resolves the default checkbox state for "Run on Latest Version".
+ * Precedence: user override > DAG-level > global config > fallback.
+ *
+ * The override is `undefined` until the user toggles the checkbox; once set,
it
+ * locks the value and won't be reset by config changes.
+ */
+export const useRerunWithLatestVersion = ({
+ dagLevelConfig,
+ fallback = false,
+}: UseRerunWithLatestVersionProps): UseRerunWithLatestVersionResult => {
+ const globalConfigValue = useConfig("rerun_with_latest_version") as boolean
| undefined;
+ const [override, setOverride] = useState<boolean | undefined>(undefined);
+
+ const value = override ?? dagLevelConfig ?? globalConfigValue ?? fallback;
+
+ return { setValue: setOverride, value };
+};
diff --git
a/airflow-core/src/airflow/ui/src/components/DagActions/RunBackfillForm.tsx
b/airflow-core/src/airflow/ui/src/components/DagActions/RunBackfillForm.tsx
index b9a07eb3109..a37eff9c455 100644
--- a/airflow-core/src/airflow/ui/src/components/DagActions/RunBackfillForm.tsx
+++ b/airflow-core/src/airflow/ui/src/components/DagActions/RunBackfillForm.tsx
@@ -22,7 +22,9 @@ import { useEffect, useState } from "react";
import { Controller, useForm, useWatch } from "react-hook-form";
import { useTranslation } from "react-i18next";
+import { useDagServiceGetDagDetails } from "openapi/queries";
import type { BackfillPostBody, DAGResponse, DAGWithLatestDagRunsResponse }
from "openapi/requests/types.gen";
+import { useRerunWithLatestVersion } from
"src/components/Clear/useRerunWithLatestVersion";
import { RadioCardItem, RadioCardLabel, RadioCardRoot } from
"src/components/ui/RadioCard";
import { reprocessBehaviors } from "src/constants/reprocessBehaviourParams";
import { useCreateBackfill } from "src/queries/useCreateBackfill";
@@ -54,18 +56,24 @@ const RunBackfillForm = ({ dag, onClose }:
RunBackfillFormProps) => {
const [formError, setFormError] = useState(false);
const initialParamsDict = useDagParams(dag.dag_id, true);
const { conf } = useParamStore();
+ const { data: dagDetails } = useDagServiceGetDagDetails({ dagId: dag.dag_id
});
+ const { value: resolvedRunOnLatest } = useRerunWithLatestVersion({
+ dagLevelConfig: dagDetails?.rerun_with_latest_version,
+ fallback: true,
+ });
const { control, handleSubmit, reset, watch } = useForm<BackfillFormProps>({
- defaultValues: {
+ mode: "onBlur",
+ resetOptions: { keepDirtyValues: true },
+ values: {
conf,
dag_id: dag.dag_id,
from_date: "",
max_active_runs: 1,
reprocess_behavior: "none",
run_backwards: false,
- run_on_latest_version: true,
+ run_on_latest_version: resolvedRunOnLatest,
to_date: "",
- },
- mode: "onBlur",
+ } as BackfillFormProps,
});
const values = useWatch<BackfillFormProps>({
control,
@@ -83,7 +91,7 @@ const RunBackfillForm = ({ dag, onClose }:
RunBackfillFormProps) => {
max_active_runs: values.max_active_runs ?? 1,
reprocess_behavior: values.reprocess_behavior,
run_backwards: values.run_backwards ?? false,
- run_on_latest_version: values.run_on_latest_version ?? true,
+ run_on_latest_version: values.run_on_latest_version,
to_date: values.to_date ?? "",
},
},
@@ -231,7 +239,7 @@ const RunBackfillForm = ({ dag, onClose }:
RunBackfillFormProps) => {
control={control}
name="run_on_latest_version"
render={({ field }) => (
- <Checkbox checked={field.value} onChange={field.onChange}>
+ <Checkbox checked={field.value ?? undefined} colorPalette="brand"
onChange={field.onChange}>
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
)}
diff --git a/airflow-core/src/airflow/ui/src/queries/useCreateBackfill.ts
b/airflow-core/src/airflow/ui/src/queries/useCreateBackfill.ts
index c99430e8cb6..01371bf0051 100644
--- a/airflow-core/src/airflow/ui/src/queries/useCreateBackfill.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useCreateBackfill.ts
@@ -84,6 +84,7 @@ export const useCreateBackfill = ({ onSuccessConfirm }: {
onSuccessConfirm: () =
max_active_runs: data.requestBody.max_active_runs,
reprocess_behavior: data.requestBody.reprocess_behavior,
run_backwards: data.requestBody.run_backwards,
+ run_on_latest_version: data.requestBody.run_on_latest_version,
to_date: formattedDataIntervalEnd,
},
});
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 1c2e8b32196..6dcd2b41219 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -28,6 +28,7 @@ from sqlalchemy import func, select
from airflow._shared.timezones import timezone
from airflow.api_fastapi.core_api.datamodels.dag_versions import
DagVersionResponse
+from airflow.api_fastapi.core_api.services.public.common import
resolve_run_on_latest_version
from airflow.models import DagModel, DagRun, Log
from airflow.models.asset import AssetEvent, AssetModel
from airflow.models.taskinstance import TaskInstance
@@ -2462,6 +2463,86 @@ class TestTriggerDagRun:
assert run.dag_id == custom_dag_id
+class TestResolveRunOnLatestVersion:
+ @pytest.mark.parametrize("explicit_value", [True, False])
+ def test_explicit_value_takes_precedence(self, explicit_value, dag_maker,
session):
+ """Explicit value always wins, regardless of DAG or global config."""
+
+ with dag_maker("test_resolver_explicit", serialized=True,
session=session):
+ ...
+
+ result = resolve_run_on_latest_version(explicit_value,
"test_resolver_explicit", session)
+ assert result is explicit_value
+
+ def test_dag_level_takes_precedence_over_global(self, dag_maker, session):
+ """DAG-level rerun_with_latest_version=True takes precedence over
global False."""
+
+ with dag_maker("test_resolver_dag", serialized=True, session=session,
rerun_with_latest_version=True):
+ ...
+
+ result = resolve_run_on_latest_version(None, "test_resolver_dag",
session)
+ assert result is True
+
+ def test_global_config_used_when_dag_not_set(self, dag_maker, session):
+ """Falls back to global config when DAG doesn't set
rerun_with_latest_version."""
+
+ with dag_maker("test_resolver_global", serialized=True,
session=session):
+ ...
+
+ with mock.patch("airflow.configuration.conf.getboolean",
return_value=True):
+ result = resolve_run_on_latest_version(None,
"test_resolver_global", session)
+ assert result is True
+
+ def test_default_is_false(self, dag_maker, session):
+ """Returns False when no explicit value, no DAG config, no global
config."""
+
+ with dag_maker("test_resolver_default", serialized=True,
session=session):
+ ...
+
+ result = resolve_run_on_latest_version(None, "test_resolver_default",
session)
+ assert result is False
+
+ def test_fallback_true_for_backfills(self, dag_maker, session):
+ """Backfill callers pass fallback=True to preserve historical
default."""
+
+ with dag_maker("test_resolver_fallback_true", serialized=True,
session=session):
+ ...
+
+ # With no DAG config and no global config set, the fallback kicks in
+ result = resolve_run_on_latest_version(None,
"test_resolver_fallback_true", session, fallback=True)
+ assert result is True
+
+ def test_dag_level_false_overrides_fallback_true(self, dag_maker, session):
+ """DAG-level False takes precedence over a True fallback (backfill
case)."""
+
+ with dag_maker(
+ "test_resolver_dag_false",
+ serialized=True,
+ session=session,
+ rerun_with_latest_version=False,
+ ):
+ ...
+
+ result = resolve_run_on_latest_version(None,
"test_resolver_dag_false", session, fallback=True)
+ assert result is False
+
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_clear_endpoint_invokes_resolver_when_field_omitted(self,
test_client):
+ """Clearing without run_on_latest_version triggers the server-side
resolver."""
+ with mock.patch(
+
"airflow.api_fastapi.core_api.routes.public.dag_run.resolve_run_on_latest_version",
+ return_value=False,
+ ) as mock_resolver:
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
+ json={"dry_run": True},
+ )
+ assert response.status_code == 200
+ mock_resolver.assert_called_once()
+ # First positional arg should be None (omitted from request body)
+ assert mock_resolver.call_args.args[0] is None
+
+
class TestWaitDagRun:
# The way we init async engine does not work well with FastAPI app init.
# Creating the engine implicitly creates an event loop, which Airflow does
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
index 3cdaca19084..f3a3eef61c0 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dags.py
@@ -1057,6 +1057,7 @@ class TestDagDetails(TestDagEndpoint):
},
"relative_fileloc": "test_dags.py",
"render_template_as_native_obj": False,
+ "rerun_with_latest_version": None,
"start_date": start_date,
"tags": [],
"template_search_path": None,
@@ -1158,6 +1159,7 @@ class TestDagDetails(TestDagEndpoint):
},
"relative_fileloc": "test_dags.py",
"render_template_as_native_obj": False,
+ "rerun_with_latest_version": None,
"start_date": start_date,
"tags": [],
"template_search_path": None,
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_config.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_config.py
index 8b9982fc47b..b8a70a96d2e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_config.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_config.py
@@ -69,6 +69,7 @@ expected_config_response = {
"external_log_name": None,
"theme": THEME,
"multi_team": False,
+ "rerun_with_latest_version": None,
}
diff --git a/airflow-core/tests/unit/cli/commands/test_backfill_command.py
b/airflow-core/tests/unit/cli/commands/test_backfill_command.py
index 79d133f84c8..faab928ddb6 100644
--- a/airflow-core/tests/unit/cli/commands/test_backfill_command.py
+++ b/airflow-core/tests/unit/cli/commands/test_backfill_command.py
@@ -90,6 +90,9 @@ class TestCliBackfill:
repro,
]
)
+ # When --run-on-latest-version is not passed, the resolver kicks in.
+ # With no DAG config and no global config set, the backfill
fallback=True applies,
+ # preserving the historical default.
airflow.cli.commands.backfill_command.create_backfill(self.parser.parse_args(args))
mock_create.assert_called_once_with(
diff --git a/airflow-core/tests/unit/serialization/test_dag_serialization.py
b/airflow-core/tests/unit/serialization/test_dag_serialization.py
index 2b6d8a2edd2..306a7eebb68 100644
--- a/airflow-core/tests/unit/serialization/test_dag_serialization.py
+++ b/airflow-core/tests/unit/serialization/test_dag_serialization.py
@@ -2431,6 +2431,29 @@ class TestStringifiedDAGs:
deserialized_dag = DagSerialization.from_dict(serialized_dag)
assert deserialized_dag.disable_bundle_versioning is expected
+ @pytest.mark.parametrize(
+ ("value", "expected"),
+ [
+ (True, True),
+ (False, False),
+ (None, None),
+ ],
+ )
+ def test_dag_rerun_with_latest_version_roundtrip(self, value, expected):
+ """Test that rerun_with_latest_version survives serialization
roundtrip."""
+ kwargs = {}
+ if value is not None:
+ kwargs["rerun_with_latest_version"] = value
+ dag = DAG(
+ dag_id="test_dag_rerun_with_latest_version_roundtrip",
+ schedule=None,
+ **kwargs,
+ )
+ BaseOperator(task_id="simple_task", dag=dag, start_date=datetime(2019,
8, 1))
+ serialized_dag = DagSerialization.to_dict(dag)
+ deserialized_dag = DagSerialization.from_dict(serialized_dag)
+ assert deserialized_dag.rerun_with_latest_version is expected
+
@pytest.mark.parametrize(
("object_to_serialized", "expected_output"),
[
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index 6886bdd3aa7..2990901b71f 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -189,10 +189,10 @@ class ClearTaskInstancesBody(BaseModel):
run_on_latest_version: Annotated[
bool | None,
Field(
- description="(Experimental) Run on the latest bundle version of
the dag after clearing the task instances.",
+ description="(Experimental) Run on the latest bundle version of
the dag after clearing the task instances. If not specified, falls back to the
DAG-level ``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False`` (the
historical default for clear/rerun).",
title="Run On Latest Version",
),
- ] = False
+ ] = None
prevent_running_task: Annotated[bool | None, Field(title="Prevent Running
Task")] = False
note: Annotated[Note | None, Field(title="Note")] = None
@@ -319,10 +319,10 @@ class DAGRunClearBody(BaseModel):
run_on_latest_version: Annotated[
bool | None,
Field(
- description="(Experimental) Run on the latest bundle version of
the Dag after clearing the Dag Run.",
+ description="(Experimental) Run on the latest bundle version of
the Dag after clearing the Dag Run. If not specified, falls back to the
DAG-level ``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``False`` (the
historical default for clear/rerun).",
title="Run On Latest Version",
),
- ] = False
+ ] = None
class DAGRunPatchStates(str, Enum):
@@ -1203,7 +1203,13 @@ class BackfillPostBody(BaseModel):
dag_run_conf: Annotated[dict[str, Any] | None, Field(title="Dag Run
Conf")] = None
reprocess_behavior: ReprocessBehavior | None = "none"
max_active_runs: Annotated[int | None, Field(title="Max Active Runs")] = 10
- run_on_latest_version: Annotated[bool | None, Field(title="Run On Latest
Version")] = True
+ run_on_latest_version: Annotated[
+ bool | None,
+ Field(
+ description="Run on the latest bundle version of the Dag for each
backfilled run. If not specified, falls back to the DAG-level
``rerun_with_latest_version`` parameter, then the ``[core]
rerun_with_latest_version`` config option, and finally ``True`` (the historical
default for backfills).",
+ title="Run On Latest Version",
+ ),
+ ] = None
class BackfillResponse(BaseModel):
@@ -1476,6 +1482,7 @@ class DAGDetailsResponse(BaseModel):
timezone: Annotated[str | None, Field(title="Timezone")] = None
last_parsed: Annotated[datetime | None, Field(title="Last Parsed")] = None
default_args: Annotated[dict[str, Any] | None, Field(title="Default
Args")] = None
+ rerun_with_latest_version: Annotated[bool | None, Field(title="Rerun With
Latest Version")] = None
owner_links: Annotated[dict[str, str] | None, Field(title="Owner Links")]
= None
is_favorite: Annotated[bool | None, Field(title="Is Favorite")] = False
active_runs_count: Annotated[int | None, Field(title="Active Runs Count")]
= 0
diff --git a/task-sdk/src/airflow/sdk/definitions/dag.py
b/task-sdk/src/airflow/sdk/definitions/dag.py
index 18200bd6714..fd47de467fe 100644
--- a/task-sdk/src/airflow/sdk/definitions/dag.py
+++ b/task-sdk/src/airflow/sdk/definitions/dag.py
@@ -417,6 +417,9 @@ class DAG:
:param allowed_run_types: An optional list or single DagRunType specifying
which run types are
permitted for this dag. When set, the scheduler and API will only
allow runs of the specified types.
:param dag_display_name: The display name of the Dag which appears on the
UI.
+ :param rerun_with_latest_version: If True, cleared or rerun tasks will use
the latest
+ available bundle version. If False, they use the original bundle
version. If None
+ (default), inherits from the global config ``[core]
rerun_with_latest_version``.
"""
__serialized_fields: ClassVar[frozenset[str]]
@@ -548,6 +551,9 @@ class DAG:
disable_bundle_versioning: bool = attrs.field(
factory=_config_bool_factory("dag_processor",
"disable_bundle_versioning")
)
+ rerun_with_latest_version: bool | None = attrs.field(
+ default=None, converter=attrs.converters.optional(bool)
+ )
# TODO (GH-52141): This is never used in the sdk dag (it only makes sense
# after this goes through the dag processor), but various parts of the code
@@ -1609,6 +1615,7 @@ if TYPE_CHECKING:
allowed_run_types: DagRunType | Collection[DagRunType] | None = None,
dag_display_name: str | None = None,
disable_bundle_versioning: bool = False,
+ rerun_with_latest_version: bool | None = None,
) -> Callable[[Callable], Callable[..., DAG]]:
"""
Python dag decorator which wraps a function into an Airflow Dag.