This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new 28fb4acd378 Fix DagVersion when clearing tasks with run on latest 
version (#65835) (#66901)
28fb4acd378 is described below

commit 28fb4acd378a281b39ac49b30931c43fbe3c645a
Author: Rahul Vats <[email protected]>
AuthorDate: Thu May 14 19:56:49 2026 +0530

    Fix DagVersion when clearing tasks with run on latest version (#65835) 
(#66901)
    
    * Fix DagVersion when clearing tasks with run on latest version
    
    * UI: Merge two useEffects in ClearGroupTaskInstanceDialog
    
    * UI: Add spacing between clear task checkboxes
    
    * UI: Share run on latest version logic for clear dialogs
    
    * UI: Merge two useEffects in ClearTaskInstanceDialog
    
    * Fix CI: apply pre-commit auto-fixes
    
    (cherry picked from commit 8c43743d3089ab08d61ed059b483084519e3df9d)
    
    Co-authored-by: Yuseok Jo <[email protected]>
---
 airflow-core/src/airflow/models/taskinstance.py    |  22 ++-
 .../TaskInstance/ClearGroupTaskInstanceDialog.tsx  |  34 ++++-
 .../Clear/TaskInstance/ClearTaskInstanceDialog.tsx |  38 +++--
 .../Clear/TaskInstance/runOnLatestVersion.test.ts  | 156 +++++++++++++++++++++
 .../Clear/TaskInstance/runOnLatestVersion.ts       |  60 ++++++++
 airflow-core/tests/unit/models/test_cleartasks.py  |  48 +++++++
 6 files changed, 336 insertions(+), 22 deletions(-)

diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 92270c885f5..15b8d796443 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -378,6 +378,11 @@ def clear_task_instances(
             ti.state = None
             ti.external_executor_id = None
             ti.clear_next_method_args()
+            # Match DagVersion to latest serialized DAG when 
run_on_latest_version.
+            if run_on_latest_version:
+                latest_dag_version = DagVersion.get_latest_version(ti.dag_id, 
session=session)
+                if latest_dag_version is not None:
+                    ti.dag_version_id = latest_dag_version.id
             session.merge(ti)
 
     if dag_run_state is not False and tis:
@@ -418,8 +423,7 @@ def clear_task_instances(
                         dr.created_dag_version_id = dag_version.id
                         dr.dag = dr_dag
                         dr.verify_integrity(session=session, 
dag_version_id=dag_version.id)
-                        for ti in dr.task_instances:
-                            ti.dag_version_id = dag_version.id
+                        # Only cleared TIs get latest dag_version_id above; do 
not rewrite others.
                 else:
                     dr_dag = scheduler_dagbag.get_dag_for_run(dag_run=dr, 
session=session)
                 if not dr_dag:
@@ -431,6 +435,20 @@ def clear_task_instances(
                 if dag_run_state == DagRunState.QUEUED:
                     dr.last_scheduling_decision = None
                     dr.start_date = None
+            elif run_on_latest_version:
+                # Queued/running DagRun: update DR to latest version/bundle 
for workloads that use it.
+                dag_version = DagVersion.get_latest_version(dr.dag_id, 
session=session)
+                if dag_version and dr.created_dag_version_id != dag_version.id:
+                    dr_dag = 
scheduler_dagbag.get_latest_version_of_dag(dr.dag_id, session=session)
+                    if not dr_dag:
+                        log.warning("No serialized dag found for dag '%s'", 
dr.dag_id)
+                    else:
+                        dr.created_dag_version_id = dag_version.id
+                        dr.dag = dr_dag
+                        if not dr_dag.disable_bundle_versioning:
+                            bundle_version = dr.dag_model.bundle_version
+                            if bundle_version is not None:
+                                dr.bundle_version = bundle_version
     for ti in tis:
         ti.context_carrier = new_task_run_carrier(ti.dag_run.context_carrier)
     session.flush()
diff --git 
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
index 82fb2ceaf87..f481e18e19d 100644
--- 
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
+++ 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearGroupTaskInstanceDialog.tsx
@@ -17,7 +17,7 @@
  * under the License.
  */
 import { Button, Flex, Heading, VStack } from "@chakra-ui/react";
-import { useState } from "react";
+import { useEffect, useRef, useState } from "react";
 import { useTranslation } from "react-i18next";
 import { CgRedo } from "react-icons/cg";
 import { useParams } from "react-router-dom";
@@ -31,6 +31,8 @@ import { useClearTaskInstances } from 
"src/queries/useClearTaskInstances";
 import { useClearTaskInstancesDryRun } from 
"src/queries/useClearTaskInstancesDryRun";
 import { isStatePending, useAutoRefresh } from "src/utils";
 
+import { getRunOnLatestVersionState } from "./runOnLatestVersion";
+
 type Props = {
   readonly onClose: () => void;
   readonly open: boolean;
@@ -56,6 +58,7 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, open, 
taskInstance }: Pr
   const upstream = selectedOptions.includes("upstream");
   const downstream = selectedOptions.includes("downstream");
   const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
+  const userToggledRunOnLatestRef = useRef(false);
 
   const [note, setNote] = useState<string>("");
 
@@ -106,8 +109,21 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, 
open, taskInstance }: Pr
     total_entries: 0,
   };
 
-  const shouldShowBundleVersionOption =
-    dagDetails?.bundle_version !== null && dagDetails?.bundle_version !== "";
+  const { dagVersionsDiffer, shouldShowRunOnLatestOption } = 
getRunOnLatestVersionState({
+    latestBundleVersion: dagDetails?.bundle_version,
+    latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
+    selectedDagVersionNumber: taskInstance.dag_version_number,
+    // Fall back to legacy heuristic when grid summary has no version (older 
API).
+    useLatestBundleVersionAsFallback: true,
+  });
+
+  useEffect(() => {
+    if (!open) {
+      userToggledRunOnLatestRef.current = false;
+    } else if (!userToggledRunOnLatestRef.current) {
+      setRunOnLatestVersion(dagVersionsDiffer);
+    }
+  }, [open, dagVersionsDiffer]);
 
   return (
     <Dialog.Root lazyMount onOpenChange={onClose} open={open} size="xl">
@@ -160,14 +176,18 @@ export const ClearGroupTaskInstanceDialog = ({ onClose, 
open, taskInstance }: Pr
           </Flex>
           <ActionAccordion affectedTasks={affectedTasks} note={note} 
setNote={setNote} />
           <Flex
-            {...(shouldShowBundleVersionOption ? { alignItems: "center" } : 
{})}
-            justifyContent={shouldShowBundleVersionOption ? "space-between" : 
"end"}
+            {...(shouldShowRunOnLatestOption ? { alignItems: "center" } : {})}
+            gap={3}
+            justifyContent={shouldShowRunOnLatestOption ? "space-between" : 
"end"}
             mt={3}
           >
-            {shouldShowBundleVersionOption ? (
+            {shouldShowRunOnLatestOption ? (
               <Checkbox
                 checked={runOnLatestVersion}
-                onCheckedChange={(event) => 
setRunOnLatestVersion(Boolean(event.checked))}
+                onCheckedChange={(event) => {
+                  userToggledRunOnLatestRef.current = true;
+                  setRunOnLatestVersion(Boolean(event.checked));
+                }}
               >
                 
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
               </Checkbox>
diff --git 
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
index c7562083c05..94ee5ade2c7 100644
--- 
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
+++ 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/ClearTaskInstanceDialog.tsx
@@ -17,7 +17,7 @@
  * under the License.
  */
 import { Button, Flex, Heading, useDisclosure, VStack } from 
"@chakra-ui/react";
-import { useState } from "react";
+import { useEffect, useRef, useState } from "react";
 import { useTranslation } from "react-i18next";
 import { CgRedo } from "react-icons/cg";
 
@@ -33,6 +33,7 @@ import { usePatchTaskInstance } from 
"src/queries/usePatchTaskInstance";
 import { isStatePending, useAutoRefresh } from "src/utils";
 
 import ClearTaskInstanceConfirmationDialog from 
"./ClearTaskInstanceConfirmationDialog";
+import { getRunOnLatestVersionState } from "./runOnLatestVersion";
 
 type Props = {
   readonly onClose: () => void;
@@ -63,6 +64,7 @@ const ClearTaskInstanceDialog = ({ onClose: onCloseDialog, 
open: openDialog, tas
   const upstream = selectedOptions.includes("upstream");
   const downstream = selectedOptions.includes("downstream");
   const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
+  const userToggledRunOnLatestRef = useRef(false);
   const [preventRunningTask, setPreventRunningTask] = useState(true);
 
   const [note, setNote] = useState<string | null>(taskInstance.note);
@@ -107,14 +109,20 @@ const ClearTaskInstanceDialog = ({ onClose: 
onCloseDialog, open: openDialog, tas
     total_entries: 0,
   };
 
-  // Check if bundle versions are different
-  const currentDagBundleVersion = dagDetails?.bundle_version;
-  const taskInstanceDagVersionBundleVersion = 
taskInstance.dag_version?.bundle_version;
-  const bundleVersionsDiffer = currentDagBundleVersion !== 
taskInstanceDagVersionBundleVersion;
-  const shouldShowBundleVersionOption =
-    bundleVersionsDiffer &&
-    taskInstanceDagVersionBundleVersion !== null &&
-    taskInstanceDagVersionBundleVersion !== "";
+  const { dagVersionsDiffer, shouldShowRunOnLatestOption } = 
getRunOnLatestVersionState({
+    latestBundleVersion: dagDetails?.bundle_version,
+    latestDagVersionNumber: dagDetails?.latest_dag_version?.version_number,
+    selectedBundleVersion: taskInstance.dag_version?.bundle_version,
+    selectedDagVersionNumber: taskInstance.dag_version?.version_number,
+  });
+
+  useEffect(() => {
+    if (!openDialog) {
+      userToggledRunOnLatestRef.current = false;
+    } else if (!userToggledRunOnLatestRef.current) {
+      setRunOnLatestVersion(dagVersionsDiffer);
+    }
+  }, [openDialog, dagVersionsDiffer]);
 
   return (
     <>
@@ -170,14 +178,18 @@ const ClearTaskInstanceDialog = ({ onClose: 
onCloseDialog, open: openDialog, tas
             </Flex>
             <ActionAccordion affectedTasks={affectedTasks} note={note} 
setNote={setNote} />
             <Flex
-              {...(shouldShowBundleVersionOption ? { alignItems: "center" } : 
{})}
-              justifyContent={shouldShowBundleVersionOption ? "space-between" 
: "end"}
+              {...(shouldShowRunOnLatestOption ? { alignItems: "center" } : 
{})}
+              gap={3}
+              justifyContent={shouldShowRunOnLatestOption ? "space-between" : 
"end"}
               mt={3}
             >
-              {shouldShowBundleVersionOption ? (
+              {shouldShowRunOnLatestOption ? (
                 <Checkbox
                   checked={runOnLatestVersion}
-                  onCheckedChange={(event) => 
setRunOnLatestVersion(Boolean(event.checked))}
+                  onCheckedChange={(event) => {
+                    userToggledRunOnLatestRef.current = true;
+                    setRunOnLatestVersion(Boolean(event.checked));
+                  }}
                 >
                   
{translate("dags:runAndTaskActions.options.runOnLatestVersion")}
                 </Checkbox>
diff --git 
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.test.ts
 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.test.ts
new file mode 100644
index 00000000000..2936e2b9191
--- /dev/null
+++ 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.test.ts
@@ -0,0 +1,156 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { describe, expect, it } from "vitest";
+
+import { getRunOnLatestVersionState } from "./runOnLatestVersion";
+
+describe("getRunOnLatestVersionState", () => {
+  it.each([
+    {
+      expectedDagVersionsDiffer: true,
+      expectedShouldShowRunOnLatestOption: true,
+      latestDagVersionNumber: 3,
+      name: "shows and defaults on when DAG version numbers differ",
+      selectedDagVersionNumber: 2,
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestDagVersionNumber: 3,
+      name: "does not show when DAG version numbers match and there is no 
bundle difference",
+      selectedDagVersionNumber: 3,
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestDagVersionNumber: undefined,
+      name: "does not treat a missing latest DAG version number as different",
+      selectedDagVersionNumber: 3,
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestDagVersionNumber: 3,
+      name: "does not treat a missing selected DAG version number as 
different",
+      selectedDagVersionNumber: undefined,
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestBundleVersion: "bundle-a",
+      name: "does not show when task bundle versions match",
+      selectedBundleVersion: "bundle-a",
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: true,
+      latestBundleVersion: "bundle-b",
+      name: "shows when task bundle versions differ",
+      selectedBundleVersion: "bundle-a",
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: true,
+      latestBundleVersion: null,
+      name: "shows when task bundle version differs from a known null latest 
bundle",
+      selectedBundleVersion: "bundle-a",
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestBundleVersion: undefined,
+      name: "does not compare task bundle version before the latest bundle is 
loaded",
+      selectedBundleVersion: "bundle-a",
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestBundleVersion: "bundle-b",
+      name: "does not show for task bundle comparison when selected bundle is 
null",
+      selectedBundleVersion: null,
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestBundleVersion: "bundle-b",
+      name: "does not show for task bundle comparison when selected bundle is 
empty",
+      selectedBundleVersion: "",
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestBundleVersion: "bundle-b",
+      name: "does not show for task bundle comparison when selected bundle is 
missing",
+      selectedBundleVersion: undefined,
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: true,
+      latestBundleVersion: "bundle-b",
+      name: "shows for group fallback when latest bundle is available",
+      selectedBundleVersion: undefined,
+      useLatestBundleVersionAsFallback: true,
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestBundleVersion: null,
+      name: "does not show for group fallback when latest bundle is null",
+      useLatestBundleVersionAsFallback: true,
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestBundleVersion: "",
+      name: "does not show for group fallback when latest bundle is empty",
+      useLatestBundleVersionAsFallback: true,
+    },
+    {
+      expectedDagVersionsDiffer: false,
+      expectedShouldShowRunOnLatestOption: false,
+      latestBundleVersion: undefined,
+      name: "does not show for group fallback when latest bundle is missing",
+      useLatestBundleVersionAsFallback: true,
+    },
+  ])(
+    "$name",
+    ({
+      expectedDagVersionsDiffer,
+      expectedShouldShowRunOnLatestOption,
+      latestBundleVersion,
+      latestDagVersionNumber,
+      selectedBundleVersion,
+      selectedDagVersionNumber,
+      useLatestBundleVersionAsFallback,
+    }) => {
+      expect(
+        getRunOnLatestVersionState({
+          latestBundleVersion,
+          latestDagVersionNumber,
+          selectedBundleVersion,
+          selectedDagVersionNumber,
+          useLatestBundleVersionAsFallback,
+        }),
+      ).toEqual({
+        dagVersionsDiffer: expectedDagVersionsDiffer,
+        shouldShowRunOnLatestOption: expectedShouldShowRunOnLatestOption,
+      });
+    },
+  );
+});
diff --git 
a/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.ts
 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.ts
new file mode 100644
index 00000000000..7bbd2daf904
--- /dev/null
+++ 
b/airflow-core/src/airflow/ui/src/components/Clear/TaskInstance/runOnLatestVersion.ts
@@ -0,0 +1,60 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+type RunOnLatestVersionParams = {
+  readonly latestBundleVersion?: string | null;
+  readonly latestDagVersionNumber?: number | null;
+  readonly selectedBundleVersion?: string | null;
+  readonly selectedDagVersionNumber?: number | null;
+  readonly useLatestBundleVersionAsFallback?: boolean;
+};
+
+type RunOnLatestVersionState = {
+  readonly dagVersionsDiffer: boolean;
+  readonly shouldShowRunOnLatestOption: boolean;
+};
+
+const hasBundleVersion = (bundleVersion: string | null | undefined) =>
+  bundleVersion !== undefined && bundleVersion !== null && bundleVersion !== 
"";
+
+export const getRunOnLatestVersionState = ({
+  latestBundleVersion,
+  latestDagVersionNumber,
+  selectedBundleVersion,
+  selectedDagVersionNumber,
+  useLatestBundleVersionAsFallback = false,
+}: RunOnLatestVersionParams): RunOnLatestVersionState => {
+  const dagVersionsDiffer =
+    latestDagVersionNumber !== undefined &&
+    latestDagVersionNumber !== null &&
+    selectedDagVersionNumber !== undefined &&
+    selectedDagVersionNumber !== null &&
+    latestDagVersionNumber !== selectedDagVersionNumber;
+
+  const shouldShowForBundleVersion = useLatestBundleVersionAsFallback
+    ? hasBundleVersion(latestBundleVersion)
+    : latestBundleVersion !== undefined &&
+      hasBundleVersion(selectedBundleVersion) &&
+      latestBundleVersion !== selectedBundleVersion;
+
+  return {
+    dagVersionsDiffer,
+    shouldShowRunOnLatestOption: dagVersionsDiffer || 
shouldShowForBundleVersion,
+  };
+};
diff --git a/airflow-core/tests/unit/models/test_cleartasks.py 
b/airflow-core/tests/unit/models/test_cleartasks.py
index 54dffde7f41..e82cb46f0f4 100644
--- a/airflow-core/tests/unit/models/test_cleartasks.py
+++ b/airflow-core/tests/unit/models/test_cleartasks.py
@@ -739,6 +739,54 @@ class TestClearTasks:
             for ti in dr.task_instances:
                 assert ti.dag_version_id == old_dag_version.id
 
+    def test_clear_subset_run_on_latest_version_only_updates_cleared_tis(self, 
dag_maker, session):
+        """run_on_latest_version on a finished DR must not rewrite 
dag_version_id on TIs that were not cleared."""
+        with dag_maker(
+            "test_clear_subset_latest",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v1",
+        ) as dag:
+            task0 = EmptyOperator(task_id="0")
+            task1 = EmptyOperator(task_id="1")
+        dr = dag_maker.create_dagrun(
+            state=State.RUNNING,
+            run_type=DagRunType.SCHEDULED,
+        )
+
+        old_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        ti0, ti1 = sorted(dr.task_instances, key=lambda ti: ti.task_id)
+        ti0.refresh_from_task(dag.get_task("0"))
+        ti1.refresh_from_task(dag.get_task("1"))
+
+        run_task_instance(ti0, task0)
+        run_task_instance(ti1, task1)
+        dr.state = DagRunState.SUCCESS
+        session.merge(dr)
+        session.flush()
+
+        with dag_maker(
+            "test_clear_subset_latest",
+            start_date=DEFAULT_DATE,
+            end_date=DEFAULT_DATE + datetime.timedelta(days=10),
+            catchup=True,
+            bundle_version="v2",
+        ):
+            EmptyOperator(task_id="0")
+            EmptyOperator(task_id="1")
+        new_dag_version = DagVersion.get_latest_version(dr.dag_id)
+        assert old_dag_version.id != new_dag_version.id
+
+        clear_task_instances([ti0], session, run_on_latest_version=True)
+        session.commit()
+
+        dr_after = session.scalar(select(DagRun).where(DagRun.dag_id == 
dr.dag_id))
+        tis = {ti.task_id: ti for ti in dr_after.task_instances}
+        assert tis["0"].dag_version_id == new_dag_version.id
+        assert tis["1"].dag_version_id == old_dag_version.id
+        assert dr_after.created_dag_version_id == new_dag_version.id
+
     def test_clear_only_new_tasks(self, dag_maker, session):
         """Test that only_new queues only newly added tasks without clearing 
existing ones."""
 

Reply via email to