This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new 28fb4acd378 Fix DagVersion when clearing tasks with run on latest
version (#65835) (#66901)
28fb4acd378 is described below
commit 28fb4acd378a281b39ac49b30931c43fbe3c645a
Author: Rahul Vats <[email protected]>
AuthorDate: Thu May 14 19:56:49 2026 +0530
Fix DagVersion when clearing tasks with run on latest version (#65835)
(#66901)
* Fix DagVersion when clearing tasks with run on latest version
* UI: Merge two useEffects in ClearGroupTaskInstanceDialog
* UI: Add spacing between clear task checkboxes
* UI: Share run on latest version logic for clear dialogs
* UI: Merge two useEffects in ClearTaskInstanceDialog
* Fix CI: apply pre-commit auto-fixes
(cherry picked from commit 8c43743d3089ab08d61ed059b483084519e3df9d)
Co-authored-by: Yuseok Jo <[email protected]>
---
airflow-core/src/airflow/models/taskinstance.py | 22 ++-
.../TaskInstance/ClearGroupTaskInstanceDialog.tsx | 34 ++++-
.../Clear/TaskInstance/ClearTaskInstanceDialog.tsx | 38 +++--
.../Clear/TaskInstance/runOnLatestVersion.test.ts | 156 +++++++++++++++++++++
.../Clear/TaskInstance/runOnLatestVersion.ts | 60 ++++++++
airflow-core/tests/unit/models/test_cleartasks.py | 48 +++++++
6 files changed, 336 insertions(+), 22 deletions(-)
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 92270c885f5..15b8d796443 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -378,6 +378,11 @@ def clear_task_instances(
ti.state = None
ti.external_executor_id = None
ti.clear_next_method_args()
+ # Match DagVersion to latest serialized DAG when
run_on_latest_version.
+ if run_on_latest_version:
+ latest_dag_version = DagVersion.get_latest_version(ti.dag_id,
session=session)
+ if latest_dag_version is not None:
+ ti.dag_version_id = latest_dag_version.id
session.merge(ti)
if dag_run_state is not False and tis:
@@ -418,8 +423,7 @@ def clear_task_instances(
dr.created_dag_version_id = dag_version.id
dr.dag = dr_dag
dr.verify_integrity(session=session,
dag_version_id=dag_version.id)
- for ti in dr.task_instances:
- ti.dag_version_id = dag_version.id
+ # Only cleared TIs get latest dag_version_id above; do
not rewrite others.
else:
dr_dag = scheduler_dagbag.get_dag_for_run(dag_run=dr,
session=session)
if not dr_dag:
@@ -431,6 +435,20 @@ def clear_task_instances(
if dag_run_state == DagRunState.QUEUED:
dr.last_scheduling_decision = None
dr.start_date = None
+ elif run_on_latest_version:
+ # Queued/running DagRun: update DR to latest version/bundle
for workloads that use it.
+ dag_version = DagVersion.get_latest_version(dr.dag_id,
session=session)
+ if dag_version and dr.created_dag_version_id != dag_version.id:
+ dr_dag =
scheduler_dagbag.get_latest_version_of_dag(dr.dag_id, session=session)
+ if not dr_dag:
+ log.warning("No serialized dag found for dag '%s'",
dr.dag_id)
+ else:
+ dr.created_dag_version_id = dag_version.id
+ dr.dag = dr_dag
+ if not dr_dag.disable_bundle_versioning:
+ bundle_version = dr.dag_model.bundle_version
+ if bundle_version is not None:
+ dr.bundle_version = bundle_version
for ti in tis:
ti.context_carrier = new_task_run_carrier(ti.dag_run.context_carrier)
session.flush()
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
index 82fb2ceaf87..f481e18e19d 100644
---
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
@@ -17,7 +17,7 @@
* under the License.
*/
import { Button, Flex, Heading, VStack } from "@chakra-ui/react";
-import { useState } from "react";
+import { useEffect, useRef, useState } from "react";
import { useTranslation } from "react-i18next";
import { CgRedo } from "react-icons/cg";
import { useParams } from "react-router-dom";
@@ -31,6 +31,8 @@ import { useClearTaskInstances } from
"src/queries/useClearTaskInstances";
import { useClearTaskInstancesDryRun } from
"src/queries/useClearTaskInstancesDryRun";
import { isStatePending, useAutoRefresh } from "src/utils";
+import { getRunOnLatestVersionState } from "./runOnLatestVersion";
+
type Props = {
readonly onClose: () => void;
readonly open: boolean;
@@ -56,6 +58,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open,
taskInstance }: Pr
const upstream = selectedOptions.includes("upstream");
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
+ const userToggledRunOnLatestRef = useRef(false);
const [note, setNote] = useState<string>("");
@@ -106,8 +109,21 @@ export const ClearGroupTaskInstanceDialog = ({ onClose,
open, taskInstance }: Pr
total_entries: 0,
};
- const shouldShowBundleVersionOption =
- dagDetails?.bundle_version !== null && dagDetails?.bundle_version !== "";
+ const { dagVersionsDiffer, shouldShowRunOnLatestOption } =
getRunOnLatestVersionState({
+ latestBundleVersion: dagDetails?.bundle_version,
+ latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
+ selectedDagVersionNumber: taskInstance.dag_version_number,
+ // Fall back to legacy heuristic when grid summary has no version (older
API).
+ useLatestBundleVersionAsFallback: true,
+ });
+
+ useEffect(() => {
+ if (!open) {
+ userToggledRunOnLatestRef.current = false;
+ } else if (!userToggledRunOnLatestRef.current) {
+ setRunOnLatestVersion(dagVersionsDiffer);
+ }
+ }, [open, dagVersionsDiffer]);
return (
<Dialog.Root lazyMount onOpenChange={onClose} open={open} size="xl">
@@ -160,14 +176,18 @@ export const ClearGroupTaskInstanceDialog = ({ onClose,
open, taskInstance }: Pr
</Flex>
<ActionAccordion affectedTasks={affectedTasks} note={note}
setNote={setNote} />
<Flex
- {...(shouldShowBundleVersionOption ? { alignItems: "center" } :
{})}
- justifyContent={shouldShowBundleVersionOption ? "space-between" :
"end"}
+ {...(shouldShowRunOnLatestOption ? { alignItems: "center" } : {})}
+ gap={3}
+ justifyContent={shouldShowRunOnLatestOption ? "space-between" :
"end"}
mt={3}
>
- {shouldShowBundleVersionOption ? (
+ {shouldShowRunOnLatestOption ? (
<Checkbox
checked={runOnLatestVersion}
- onCheckedChange={(event) =>
setRunOnLatestVersion(Boolean(event.checked))}
+ onCheckedChange={(event) => {
+ userToggledRunOnLatestRef.current = true;
+ setRunOnLatestVersion(Boolean(event.checked));
+ }}
>
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
index c7562083c05..94ee5ade2c7 100644
---
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
@@ -17,7 +17,7 @@
* under the License.
*/
import { Button, Flex, Heading, useDisclosure, VStack } from
"@chakra-ui/react";
-import { useState } from "react";
+import { useEffect, useRef, useState } from "react";
import { useTranslation } from "react-i18next";
import { CgRedo } from "react-icons/cg";
@@ -33,6 +33,7 @@ import { usePatchTaskInstance } from
"src/queries/usePatchTaskInstance";
import { isStatePending, useAutoRefresh } from "src/utils";
import ClearTaskInstanceConfirmationDialog from
"./ClearTaskInstanceConfirmationDialog";
+import { getRunOnLatestVersionState } from "./runOnLatestVersion";
type Props = {
readonly onClose: () => void;
@@ -63,6 +64,7 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog,
open: openDialog, tas
const upstream = selectedOptions.includes("upstream");
const downstream = selectedOptions.includes("downstream");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
+ const userToggledRunOnLatestRef = useRef(false);
const [preventRunningTask, setPreventRunningTask] = useState(true);
const [note, setNote] = useState<string | null>(taskInstance.note);
@@ -107,14 +109,20 @@ const ClearTaskInstanceDialog = ({ onClose:
onCloseDialog, open: openDialog, tas
total_entries: 0,
};
- // Check if bundle versions are different
- const currentDagBundleVersion = dagDetails?.bundle_version;
- const taskInstanceDagVersionBundleVersion =
taskInstance.dag_version?.bundle_version;
- const bundleVersionsDiffer = currentDagBundleVersion !==
taskInstanceDagVersionBundleVersion;
- const shouldShowBundleVersionOption =
- bundleVersionsDiffer &&
- taskInstanceDagVersionBundleVersion !== null &&
- taskInstanceDagVersionBundleVersion !== "";
+ const { dagVersionsDiffer, shouldShowRunOnLatestOption } =
getRunOnLatestVersionState({
+ latestBundleVersion: dagDetails?.bundle_version,
+ latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
+ selectedBundleVersion: taskInstance.dag_version?.bundle_version,
+ selectedDagVersionNumber: taskInstance.dag_version?.version_number,
+ });
+
+ useEffect(() => {
+ if (!openDialog) {
+ userToggledRunOnLatestRef.current = false;
+ } else if (!userToggledRunOnLatestRef.current) {
+ setRunOnLatestVersion(dagVersionsDiffer);
+ }
+ }, [openDialog, dagVersionsDiffer]);
return (
<>
@@ -170,14 +178,18 @@ const ClearTaskInstanceDialog = ({ onClose:
onCloseDialog, open: openDialog, tas
</Flex>
<ActionAccordion affectedTasks={affectedTasks} note={note}
setNote={setNote} />
<Flex
- {...(shouldShowBundleVersionOption ? { alignItems: "center" } :
{})}
- justifyContent={shouldShowBundleVersionOption ? "space-between"
: "end"}
+ {...(shouldShowRunOnLatestOption ? { alignItems: "center" } :
{})}
+ gap={3}
+ justifyContent={shouldShowRunOnLatestOption ? "space-between" :
"end"}
mt={3}
>
- {shouldShowBundleVersionOption ? (
+ {shouldShowRunOnLatestOption ? (
<Checkbox
checked={runOnLatestVersion}
- onCheckedChange={(event) =>
setRunOnLatestVersion(Boolean(event.checked))}
+ onCheckedChange={(event) => {
+ userToggledRunOnLatestRef.current = true;
+ setRunOnLatestVersion(Boolean(event.checked));
+ }}
>
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
</Checkbox>
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.test.ts
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.test.ts
new file mode 100644
index 00000000000..2936e2b9191
--- /dev/null
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.test.ts
@@ -0,0 +1,156 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { describe, expect, it } from "vitest";
+
+import { getRunOnLatestVersionState } from "./runOnLatestVersion";
+
+describe("getRunOnLatestVersionState", () => {
+ it.each([
+ {
+ expectedDagVersionsDiffer: true,
+ expectedShouldShowRunOnLatestOption: true,
+ latestDagVersionNumber: 3,
+ name: "shows and defaults on when DAG version numbers differ",
+ selectedDagVersionNumber: 2,
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestDagVersionNumber: 3,
+ name: "does not show when DAG version numbers match and there is no
bundle difference",
+ selectedDagVersionNumber: 3,
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestDagVersionNumber: undefined,
+ name: "does not treat a missing latest DAG version number as different",
+ selectedDagVersionNumber: 3,
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestDagVersionNumber: 3,
+ name: "does not treat a missing selected DAG version number as
different",
+ selectedDagVersionNumber: undefined,
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestBundleVersion: "bundle-a",
+ name: "does not show when task bundle versions match",
+ selectedBundleVersion: "bundle-a",
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: true,
+ latestBundleVersion: "bundle-b",
+ name: "shows when task bundle versions differ",
+ selectedBundleVersion: "bundle-a",
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: true,
+ latestBundleVersion: null,
+ name: "shows when task bundle version differs from a known null latest
bundle",
+ selectedBundleVersion: "bundle-a",
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestBundleVersion: undefined,
+ name: "does not compare task bundle version before the latest bundle is
loaded",
+ selectedBundleVersion: "bundle-a",
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestBundleVersion: "bundle-b",
+ name: "does not show for task bundle comparison when selected bundle is
null",
+ selectedBundleVersion: null,
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestBundleVersion: "bundle-b",
+ name: "does not show for task bundle comparison when selected bundle is
empty",
+ selectedBundleVersion: "",
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestBundleVersion: "bundle-b",
+ name: "does not show for task bundle comparison when selected bundle is
missing",
+ selectedBundleVersion: undefined,
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: true,
+ latestBundleVersion: "bundle-b",
+ name: "shows for group fallback when latest bundle is available",
+ selectedBundleVersion: undefined,
+ useLatestBundleVersionAsFallback: true,
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestBundleVersion: null,
+ name: "does not show for group fallback when latest bundle is null",
+ useLatestBundleVersionAsFallback: true,
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestBundleVersion: "",
+ name: "does not show for group fallback when latest bundle is empty",
+ useLatestBundleVersionAsFallback: true,
+ },
+ {
+ expectedDagVersionsDiffer: false,
+ expectedShouldShowRunOnLatestOption: false,
+ latestBundleVersion: undefined,
+ name: "does not show for group fallback when latest bundle is missing",
+ useLatestBundleVersionAsFallback: true,
+ },
+ ])(
+ "$name",
+ ({
+ expectedDagVersionsDiffer,
+ expectedShouldShowRunOnLatestOption,
+ latestBundleVersion,
+ latestDagVersionNumber,
+ selectedBundleVersion,
+ selectedDagVersionNumber,
+ useLatestBundleVersionAsFallback,
+ }) => {
+ expect(
+ getRunOnLatestVersionState({
+ latestBundleVersion,
+ latestDagVersionNumber,
+ selectedBundleVersion,
+ selectedDagVersionNumber,
+ useLatestBundleVersionAsFallback,
+ }),
+ ).toEqual({
+ dagVersionsDiffer: expectedDagVersionsDiffer,
+ shouldShowRunOnLatestOption: expectedShouldShowRunOnLatestOption,
+ });
+ },
+ );
+});
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.ts
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.ts
new file mode 100644
index 00000000000..7bbd2daf904
--- /dev/null
+++
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.ts
@@ -0,0 +1,60 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+type RunOnLatestVersionParams = {
+ readonly latestBundleVersion?: string | null;
+ readonly latestDagVersionNumber?: number | null;
+ readonly selectedBundleVersion?: string | null;
+ readonly selectedDagVersionNumber?: number | null;
+ readonly useLatestBundleVersionAsFallback?: boolean;
+};
+
+type RunOnLatestVersionState = {
+ readonly dagVersionsDiffer: boolean;
+ readonly shouldShowRunOnLatestOption: boolean;
+};
+
+const hasBundleVersion = (bundleVersion: string | null | undefined) =>
+ bundleVersion !== undefined && bundleVersion !== null && bundleVersion !==
"";
+
+export const getRunOnLatestVersionState = ({
+ latestBundleVersion,
+ latestDagVersionNumber,
+ selectedBundleVersion,
+ selectedDagVersionNumber,
+ useLatestBundleVersionAsFallback = false,
+}: RunOnLatestVersionParams): RunOnLatestVersionState => {
+ const dagVersionsDiffer =
+ latestDagVersionNumber !== undefined &&
+ latestDagVersionNumber !== null &&
+ selectedDagVersionNumber !== undefined &&
+ selectedDagVersionNumber !== null &&
+ latestDagVersionNumber !== selectedDagVersionNumber;
+
+ const shouldShowForBundleVersion = useLatestBundleVersionAsFallback
+ ? hasBundleVersion(latestBundleVersion)
+ : latestBundleVersion !== undefined &&
+ hasBundleVersion(selectedBundleVersion) &&
+ latestBundleVersion !== selectedBundleVersion;
+
+ return {
+ dagVersionsDiffer,
+ shouldShowRunOnLatestOption: dagVersionsDiffer ||
shouldShowForBundleVersion,
+ };
+};
diff --git a/airflow-core/tests/unit/models/test_cleartasks.py
b/airflow-core/tests/unit/models/test_cleartasks.py
index 54dffde7f41..e82cb46f0f4 100644
--- a/airflow-core/tests/unit/models/test_cleartasks.py
+++ b/airflow-core/tests/unit/models/test_cleartasks.py
@@ -739,6 +739,54 @@ class TestClearTasks:
for ti in dr.task_instances:
assert ti.dag_version_id == old_dag_version.id
+ def test_clear_subset_run_on_latest_version_only_updates_cleared_tis(self,
dag_maker, session):
+ """run_on_latest_version on a finished DR must not rewrite
dag_version_id on TIs that were not cleared."""
+ with dag_maker(
+ "test_clear_subset_latest",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v1",
+ ) as dag:
+ task0 = EmptyOperator(task_id="0")
+ task1 = EmptyOperator(task_id="1")
+ dr = dag_maker.create_dagrun(
+ state=State.RUNNING,
+ run_type=DagRunType.SCHEDULED,
+ )
+
+ old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+ ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+ ti0.refresh_from_task(dag.get_task("0"))
+ ti1.refresh_from_task(dag.get_task("1"))
+
+ run_task_instance(ti0, task0)
+ run_task_instance(ti1, task1)
+ dr.state = DagRunState.SUCCESS
+ session.merge(dr)
+ session.flush()
+
+ with dag_maker(
+ "test_clear_subset_latest",
+ start_date=DEFAULT_DATE,
+ end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+ catchup=True,
+ bundle_version="v2",
+ ):
+ EmptyOperator(task_id="0")
+ EmptyOperator(task_id="1")
+ new_dag_version = DagVersion.get_latest_version(dr.dag_id)
+ assert old_dag_version.id != new_dag_version.id
+
+ clear_task_instances([ti0], session, run_on_latest_version=True)
+ session.commit()
+
+ dr_after = session.scalar(select(DagRun).where(DagRun.dag_id ==
dr.dag_id))
+ tis = {ti.task_id: ti for ti in dr_after.task_instances}
+ assert tis["0"].dag_version_id == new_dag_version.id
+ assert tis["1"].dag_version_id == old_dag_version.id
+ assert dr_after.created_dag_version_id == new_dag_version.id
+
def test_clear_only_new_tasks(self, dag_maker, session):
"""Test that only_new queues only newly added tasks without clearing
existing ones."""