This is an automated email from the ASF dual-hosted git repository.

bbovenzi pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-2-test by this push:
     new f0ec21a364b feat: enable queue up new tasks (#63484) (#66869)
f0ec21a364b is described below

commit f0ec21a364b3ade2ba3e6131346f2e528f63a884
Author: Rahul Vats <[email protected]>
AuthorDate: Sat May 16 00:40:12 2026 +0530

    feat: enable queue up new tasks (#63484) (#66869)
    
    * feat: enable queue up new tasks
    
    * chore: remove redundant disable flag from button
    
    * feat: create and pass NewTaskCollectionResponse
    
    * cleanup: unused translations
    
    * fix: remove additional return type and simplify code
    
    * remove redundancy
    
    * fix: remove unused var
    
    * fix: mypy and tests
    
    * fix: more mypy
    
    * cleanup
    
    * fix: mypy
    
    * fix: pnpm lint
    
    * cleanup
    
    (cherry picked from commit dae48ba722c420010de2c831587d005923644fc3)
    
    Co-authored-by: Oscar Ligthart <[email protected]>
---
 .../api_fastapi/core_api/datamodels/dag_run.py     | 14 +++-
 .../core_api/datamodels/task_instances.py          | 31 ++++++++
 .../core_api/openapi/v2-rest-api-generated.yaml    | 42 ++++++++++-
 .../api_fastapi/core_api/routes/public/dag_run.py  | 31 +++++---
 airflow-core/src/airflow/models/taskinstance.py    |  9 ++-
 .../src/airflow/serialization/definitions/dag.py   | 17 +++++
 .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 50 +++++++++++++
 .../airflow/ui/openapi-gen/requests/types.gen.ts   | 24 ++++++-
 .../ui/src/components/Clear/Run/ClearRunDialog.tsx | 29 +++++---
 .../airflow/ui/src/queries/useClearDagRunDryRun.ts |  7 +-
 .../core_api/routes/public/test_dag_run.py         | 84 ++++++++++++++++++++++
 .../src/airflowctl/api/datamodels/generated.py     | 25 +++++++
 12 files changed, 337 insertions(+), 26 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index e88aad3d6d1..510015a9193 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -20,7 +20,7 @@ from __future__ import annotations
 from collections.abc import Iterable
 from datetime import datetime
 from enum import Enum
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
 
 from pydantic import AliasPath, AwareDatetime, Field, NonNegativeInt, 
model_validator
 
@@ -55,11 +55,23 @@ class DAGRunClearBody(StrictBaseModel):
 
     dry_run: bool = True
     only_failed: bool = False
+    only_new: bool = Field(
+        default=False,
+        description="Only queue newly added tasks in the latest DAG version 
without clearing existing tasks.",
+    )
     run_on_latest_version: bool = Field(
         default=False,
         description="(Experimental) Run on the latest bundle version of the 
Dag after clearing the Dag Run.",
     )
 
+    @model_validator(mode="before")
+    @classmethod
+    def validate_model(cls, data: Any) -> Any:
+        """Validate clear DAG run form."""
+        if data.get("only_new") and data.get("only_failed"):
+            raise ValueError("only_new and only_failed are mutually exclusive")
+        return data
+
 
 class DAGRunResponse(BaseModel):
     """Dag Run serializer for responses."""
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py 
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 979218f3151..ae0e6c31f47 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -25,9 +25,11 @@ from pydantic import (
     AliasPath,
     AwareDatetime,
     BeforeValidator,
+    Discriminator,
     Field,
     NonNegativeInt,
     StringConstraints,
+    Tag,
     ValidationError,
     field_validator,
     model_validator,
@@ -40,6 +42,13 @@ from airflow.api_fastapi.core_api.datamodels.trigger import 
TriggerResponse
 from airflow.utils.state import TaskInstanceState
 
 
+class NewTaskResponse(BaseModel):
+    """Lightweight response for new tasks that don't have TaskInstances yet."""
+
+    task_id: str
+    task_display_name: str
+
+
 class TaskInstanceResponse(BaseModel):
     """TaskInstance serializer for responses."""
 
@@ -112,6 +121,28 @@ class TaskInstanceCollectionResponse(BaseModel):
     )
 
 
+def _task_instance_discriminator(v: Any) -> str:
+    """Discriminate between TaskInstanceResponse and NewTaskResponse in the 
union."""
+    if isinstance(v, NewTaskResponse):
+        return "new"
+    if isinstance(v, dict):
+        return "new" if "id" not in v else "full"
+    # ORM objects and TaskInstanceResponse instances
+    return "full"
+
+
+class ClearTaskInstanceCollectionResponse(BaseModel):
+    """Response for clear dag run dry run, which may contain new tasks without 
full TaskInstance data."""
+
+    task_instances: Iterable[
+        Annotated[
+            Annotated[TaskInstanceResponse, Tag("full")] | 
Annotated[NewTaskResponse, Tag("new")],
+            Discriminator(_task_instance_discriminator),
+        ]
+    ]
+    total_entries: int
+
+
 class TaskDependencyResponse(BaseModel):
     """Task Dependency serializer for responses."""
 
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 767cdfb29d7..9d8b59ecaea 100644
--- 
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++ 
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2142,7 +2142,7 @@ paths:
             application/json:
               schema:
                 anyOf:
-                - $ref: '#/components/schemas/TaskInstanceCollectionResponse'
+                - $ref: 
'#/components/schemas/ClearTaskInstanceCollectionResponse'
                 - $ref: '#/components/schemas/DAGRunResponse'
                 title: Response Clear Dag Run
         '401':
@@ -11072,6 +11072,25 @@ components:
       - action
       - entities
       title: BulkUpdateAction[VariableBody]
+    ClearTaskInstanceCollectionResponse:
+      properties:
+        task_instances:
+          items:
+            oneOf:
+            - $ref: '#/components/schemas/TaskInstanceResponse'
+            - $ref: '#/components/schemas/NewTaskResponse'
+          type: array
+          title: Task Instances
+        total_entries:
+          type: integer
+          title: Total Entries
+      type: object
+      required:
+      - task_instances
+      - total_entries
+      title: ClearTaskInstanceCollectionResponse
+      description: Response for clear dag run dry run, which may contain new 
tasks
+        without full TaskInstance data.
     ClearTaskInstancesBody:
       properties:
         dry_run:
@@ -11852,6 +11871,12 @@ components:
           type: boolean
           title: Only Failed
           default: false
+        only_new:
+          type: boolean
+          title: Only New
+          description: Only queue newly added tasks in the latest DAG version 
without
+            clearing existing tasks.
+          default: false
         run_on_latest_version:
           type: boolean
           title: Run On Latest Version
@@ -13271,6 +13296,21 @@ components:
       type: object
       title: MaterializeAssetBody
       description: Materialize asset request.
+    NewTaskResponse:
+      properties:
+        task_id:
+          type: string
+          title: Task Id
+        task_display_name:
+          type: string
+          title: Task Display Name
+      type: object
+      required:
+      - task_id
+      - task_display_name
+      title: NewTaskResponse
+      description: Lightweight response for new tasks that don't have 
TaskInstances
+        yet.
     PatchTaskInstanceBody:
       properties:
         new_state:
diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index e168bd782f3..5e33c844aeb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -85,7 +85,8 @@ from airflow.api_fastapi.core_api.datamodels.dag_run import (
     TriggerDAGRunPostBody,
 )
 from airflow.api_fastapi.core_api.datamodels.task_instances import (
-    TaskInstanceCollectionResponse,
+    ClearTaskInstanceCollectionResponse,
+    NewTaskResponse,
     TaskInstanceResponse,
 )
 from airflow.api_fastapi.core_api.openapi.exceptions import 
create_openapi_http_exception_doc
@@ -296,7 +297,7 @@ def clear_dag_run(
     body: DAGRunClearBody,
     dag_bag: DagBagDep,
     session: SessionDep,
-) -> TaskInstanceCollectionResponse | DAGRunResponse:
+) -> ClearTaskInstanceCollectionResponse | DAGRunResponse:
     dag_run = session.scalar(
         select(DagRun).filter_by(dag_id=dag_id, 
run_id=dag_run_id).options(joinedload(DagRun.dag_model))
     )
@@ -308,27 +309,39 @@ def clear_dag_run(
 
     dag = dag_bag.get_dag_for_run(dag_run, session=session)
 
+    if not dag:
+        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
     if body.dry_run:
-        if not dag:
-            raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id 
{dag_id} was not found")
-        task_instances = dag.clear(
+        task_instances_or_ids = dag.clear(
             run_id=dag_run_id,
             task_ids=None,
+            only_new=body.only_new,
             only_failed=body.only_failed,
             run_on_latest_version=body.run_on_latest_version,
             dry_run=True,
             session=session,
         )
 
-        return TaskInstanceCollectionResponse(
-            task_instances=cast("list[TaskInstanceResponse]", task_instances),
+        if body.only_new:
+            # Create lightweight NewTaskResponse objects for new tasks
+            new_task_ids = cast("set[str]", task_instances_or_ids)
+            task_instances: list[TaskInstanceResponse | NewTaskResponse] = [
+                NewTaskResponse(task_id=task_id, task_display_name=task_id)
+                for task_id in sorted(new_task_ids)
+            ]
+        else:
+            task_instances = cast("list[TaskInstanceResponse | 
NewTaskResponse]", task_instances_or_ids)
+
+        return ClearTaskInstanceCollectionResponse(
+            task_instances=task_instances,
             total_entries=len(task_instances),
         )
-    if not dag:
-        raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id} 
was not found")
+
     dag.clear(
         run_id=dag_run_id,
         task_ids=None,
+        only_new=body.only_new,
         only_failed=body.only_failed,
         run_on_latest_version=body.run_on_latest_version,
         session=session,
diff --git a/airflow-core/src/airflow/models/taskinstance.py 
b/airflow-core/src/airflow/models/taskinstance.py
index 15b8d796443..3e2cf88beaf 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -262,7 +262,14 @@ def _get_new_task_ids(
     if not latest_dag:
         raise ValueError(f"Latest DAG version for '{dag_id}' not found")
 
-    current_dag = scheduler_dagbag.get_dag_for_run(dag_run=dag_run, 
session=session)
+    # Use created_dag_version_id directly to get the DAG version the run was
+    # originally created with. We cannot use get_dag_for_run here because it
+    # falls back to the latest version when bundle_version is not set (e.g.
+    # LocalDagBundle), which would make current_dag == latest_dag and the diff
+    # always empty.
+    current_dag = None
+    if dag_run.created_dag_version_id:
+        current_dag = 
scheduler_dagbag.get_dag(version_id=dag_run.created_dag_version_id, 
session=session)
     new_task_ids = set(latest_dag.task_ids) - set(current_dag.task_ids) if 
current_dag else set()
 
     return list(new_task_ids)
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py 
b/airflow-core/src/airflow/serialization/definitions/dag.py
index 8844a18412c..a1589f62995 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -944,6 +944,23 @@ class SerializedDAG:
         run_on_latest_version: bool = False,
     ) -> list[TaskInstance]: ...  # pragma: no cover
 
+    @overload
+    def clear(
+        self,
+        *,
+        dry_run: Literal[True],
+        task_ids: Collection[str | tuple[str, int]] | None = None,
+        run_id: str,
+        only_failed: bool = False,
+        only_running: bool = False,
+        only_new: bool,
+        dag_run_state: DagRunState = DagRunState.QUEUED,
+        session: Session = NEW_SESSION,
+        exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = 
frozenset(),
+        exclude_run_ids: frozenset[str] | None = frozenset(),
+        run_on_latest_version: bool = False,
+    ) -> set[str] | list[TaskInstance]: ...  # pragma: no cover
+
     @overload
     def clear(
         self,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 8cb0d297901..b2b7beb5418 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1270,6 +1270,33 @@ export const $BulkUpdateAction_VariableBody_ = {
     title: 'BulkUpdateAction[VariableBody]'
 } as const;
 
+export const $ClearTaskInstanceCollectionResponse = {
+    properties: {
+        task_instances: {
+            items: {
+                oneOf: [
+                    {
+                        '$ref': '#/components/schemas/TaskInstanceResponse'
+                    },
+                    {
+                        '$ref': '#/components/schemas/NewTaskResponse'
+                    }
+                ]
+            },
+            type: 'array',
+            title: 'Task Instances'
+        },
+        total_entries: {
+            type: 'integer',
+            title: 'Total Entries'
+        }
+    },
+    type: 'object',
+    required: ['task_instances', 'total_entries'],
+    title: 'ClearTaskInstanceCollectionResponse',
+    description: 'Response for clear dag run dry run, which may contain new 
tasks without full TaskInstance data.'
+} as const;
+
 export const $ClearTaskInstancesBody = {
     properties: {
         dry_run: {
@@ -2467,6 +2494,12 @@ export const $DAGRunClearBody = {
             title: 'Only Failed',
             default: false
         },
+        only_new: {
+            type: 'boolean',
+            title: 'Only New',
+            description: 'Only queue newly added tasks in the latest DAG 
version without clearing existing tasks.',
+            default: false
+        },
         run_on_latest_version: {
             type: 'boolean',
             title: 'Run On Latest Version',
@@ -4608,6 +4641,23 @@ export const $MaterializeAssetBody = {
     description: 'Materialize asset request.'
 } as const;
 
+export const $NewTaskResponse = {
+    properties: {
+        task_id: {
+            type: 'string',
+            title: 'Task Id'
+        },
+        task_display_name: {
+            type: 'string',
+            title: 'Task Display Name'
+        }
+    },
+    type: 'object',
+    required: ['task_id', 'task_display_name'],
+    title: 'NewTaskResponse',
+    description: "Lightweight response for new tasks that don't have 
TaskInstances yet."
+} as const;
+
 export const $PatchTaskInstanceBody = {
     properties: {
         new_state: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts 
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 7bac12137b6..cca921b6bef 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -400,6 +400,14 @@ export type BulkUpdateAction_VariableBody_ = {
     action_on_non_existence?: BulkActionNotOnExistence;
 };
 
+/**
+ * Response for clear dag run dry run, which may contain new tasks without 
full TaskInstance data.
+ */
+export type ClearTaskInstanceCollectionResponse = {
+    task_instances: Array<(TaskInstanceResponse | NewTaskResponse)>;
+    total_entries: number;
+};
+
 /**
  * Request body for Clear Task Instances endpoint.
  */
@@ -644,6 +652,10 @@ export type DAGResponse = {
 export type DAGRunClearBody = {
     dry_run?: boolean;
     only_failed?: boolean;
+    /**
+     * Only queue newly added tasks in the latest DAG version without clearing 
existing tasks.
+     */
+    only_new?: boolean;
     /**
      * (Experimental) Run on the latest bundle version of the Dag after 
clearing the Dag Run.
      */
@@ -1167,6 +1179,14 @@ export type MaterializeAssetBody = {
     partition_key?: string | null;
 };
 
+/**
+ * Lightweight response for new tasks that don't have TaskInstances yet.
+ */
+export type NewTaskResponse = {
+    task_id: string;
+    task_display_name: string;
+};
+
 /**
  * Request body for Clear Task Instances endpoint.
  */
@@ -2626,7 +2646,7 @@ export type ClearDagRunData = {
     requestBody: DAGRunClearBody;
 };
 
-export type ClearDagRunResponse = TaskInstanceCollectionResponse | 
DAGRunResponse;
+export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse | 
DAGRunResponse;
 
 export type GetDagRunsData = {
     bundleVersion?: string | null;
@@ -4947,7 +4967,7 @@ export type $OpenApiTs = {
                 /**
                  * Successful Response
                  */
-                200: TaskInstanceCollectionResponse | DAGRunResponse;
+                200: ClearTaskInstanceCollectionResponse | DAGRunResponse;
                 /**
                  * Unauthorized
                  */
diff --git 
a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx 
b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
index a4b53c10428..6c36b478495 100644
--- a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
+++ b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
@@ -22,7 +22,7 @@ import { useTranslation } from "react-i18next";
 import { CgRedo } from "react-icons/cg";
 
 import { useDagServiceGetDagDetails } from "openapi/queries";
-import type { DAGRunResponse, TaskInstanceResponse } from 
"openapi/requests/types.gen";
+import type { DAGRunResponse } from "openapi/requests/types.gen";
 import { ActionAccordion } from "src/components/ActionAccordion";
 import { Checkbox, Dialog } from "src/components/ui";
 import SegmentedControl from "src/components/ui/SegmentedControl";
@@ -45,6 +45,7 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => {
   const [note, setNote] = useState<string | null>(dagRun.note);
   const [selectedOptions, setSelectedOptions] = 
useState<Array<string>>(["existingTasks"]);
   const onlyFailed = selectedOptions.includes("onlyFailed");
+  const onlyNew = selectedOptions.includes("newTasks");
   const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
 
   // Get current DAG's bundle version to compare with DAG run's bundle version
@@ -59,11 +60,15 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) 
=> {
     dagRunId,
     options: {
       refetchInterval: (query) =>
-        query.state.data?.task_instances.some((ti: TaskInstanceResponse) => 
isStatePending(ti.state))
+        query.state.data?.task_instances.some((ti) => "state" in ti && 
isStatePending(ti.state))
           ? refetchInterval
           : false,
     },
-    requestBody: { only_failed: onlyFailed, run_on_latest_version: 
runOnLatestVersion },
+    requestBody: {
+      only_failed: onlyFailed,
+      only_new: onlyNew,
+      run_on_latest_version: runOnLatestVersion,
+    },
   });
 
   const { isPending, mutate } = useClearDagRun({
@@ -78,12 +83,14 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) 
=> {
     onSuccess: onClose,
   });
 
-  // Check if bundle versions are different
-  const currentDagBundleVersion = dagDetails?.bundle_version;
-  const dagRunBundleVersion = dagRun.bundle_version;
-  const bundleVersionsDiffer = currentDagBundleVersion !== dagRunBundleVersion;
-  const shouldShowBundleVersionOption =
-    bundleVersionsDiffer && dagRunBundleVersion !== null && 
dagRunBundleVersion !== "";
+  // Check if DAG versions differ (works for both bundle-versioned and local 
bundles)
+  const latestDagVersionNumber = 
dagDetails?.latest_dag_version?.version_number;
+  const dagRunVersionNumber = dagRun.dag_versions.at(-1)?.version_number;
+  const versionsDiffer =
+    latestDagVersionNumber !== undefined &&
+    dagRunVersionNumber !== undefined &&
+    latestDagVersionNumber !== dagRunVersionNumber;
+  const shouldShowBundleVersionOption = versionsDiffer && !onlyNew;
 
   return (
     <Dialog.Root lazyMount onOpenChange={onClose} open={open} size="xl">
@@ -116,9 +123,8 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) 
=> {
                   value: "onlyFailed",
                 },
                 {
-                  disabled: true,
                   label: translate("dags:runAndTaskActions.options.queueNew"),
-                  value: "new_tasks",
+                  value: "newTasks",
                 },
               ]}
             />
@@ -148,6 +154,7 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) 
=> {
                   requestBody: {
                     dry_run: false,
                     only_failed: onlyFailed,
+                    only_new: onlyNew,
                     run_on_latest_version: runOnLatestVersion,
                   },
                 });
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearDagRunDryRun.ts 
b/airflow-core/src/airflow/ui/src/queries/useClearDagRunDryRun.ts
index 2109df3d081..2e752395310 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearDagRunDryRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearDagRunDryRun.ts
@@ -47,5 +47,10 @@ export const useClearDagRunDryRun = <TData = 
TaskInstanceCollectionResponse, TEr
           ...requestBody,
         },
       }) as TData,
-    queryKey: [useClearDagRunDryRunKey, dagId, dagRunId, { only_failed: 
requestBody.only_failed }],
+    queryKey: [
+      useClearDagRunDryRunKey,
+      dagId,
+      dagRunId,
+      { only_failed: requestBody.only_failed, only_new: requestBody.only_new },
+    ],
   });
diff --git 
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py 
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 50dbf0190a4..34ad144406e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -1729,6 +1729,90 @@ class TestClearDagRun:
         assert body["detail"][0]["msg"] == "Field required"
         assert body["detail"][0]["loc"][0] == "body"
 
+    @mock.patch("airflow.serialization.definitions.dag.SerializedDAG.clear")
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_clear_dag_run_only_new_dry_run(self, mock_clear, test_client, 
session):
+        """Test that only_new dry_run returns placeholder task instances for 
new tasks."""
+        mock_clear.return_value = {"new_task_1", "new_task_2", "new_task_3"}
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
+            json={"dry_run": True, "only_new": True},
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["total_entries"] == 3
+        # Verify new tasks are returned with correct task_ids in task_instances
+        task_ids = sorted(t["task_id"] for t in body["task_instances"])
+        assert task_ids == ["new_task_1", "new_task_2", "new_task_3"]
+        # Verify task_display_name defaults to task_id
+        for task in body["task_instances"]:
+            assert task["task_display_name"] == task["task_id"]
+        mock_clear.assert_called_once_with(
+            run_id=DAG1_RUN1_ID,
+            task_ids=None,
+            only_new=True,
+            only_failed=False,
+            run_on_latest_version=False,
+            dry_run=True,
+            session=mock.ANY,
+        )
+        logs = session.scalar(
+            select(func.count())
+            .select_from(Log)
+            .where(Log.dag_id == DAG1_ID, Log.run_id == DAG1_RUN1_ID, 
Log.event == "clear_dag_run")
+        )
+        assert logs == 0
+
+    @mock.patch("airflow.serialization.definitions.dag.SerializedDAG.clear")
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_clear_dag_run_only_new_dry_run_no_new_tasks(self, mock_clear, 
test_client, session):
+        """Test that only_new dry_run returns 0 total_entries when there are 
no new tasks."""
+        mock_clear.return_value = set()
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
+            json={"dry_run": True, "only_new": True},
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["task_instances"] == []
+        assert body["total_entries"] == 0
+
+    @mock.patch("airflow.serialization.definitions.dag.SerializedDAG.clear")
+    @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+    def test_clear_dag_run_only_new_non_dry_run(self, mock_clear, test_client, 
session):
+        """Test that only_new non-dry_run clears and returns a 
DAGRunResponse."""
+        mock_clear.return_value = 2
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
+            json={"dry_run": False, "only_new": True},
+        )
+        assert response.status_code == 200
+        body = response.json()
+        assert body["dag_id"] == DAG1_ID
+        assert body["dag_run_id"] == DAG1_RUN1_ID
+        mock_clear.assert_called_once_with(
+            run_id=DAG1_RUN1_ID,
+            task_ids=None,
+            only_new=True,
+            only_failed=False,
+            run_on_latest_version=False,
+            session=mock.ANY,
+        )
+        _check_last_log(
+            session,
+            dag_id=DAG1_ID,
+            event="clear_dag_run",
+            logical_date=None,
+        )
+
+    def test_clear_dag_run_only_new_and_only_failed_mutually_exclusive(self, 
test_client):
+        """Test that only_new and only_failed cannot both be True."""
+        response = test_client.post(
+            f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
+            json={"dry_run": True, "only_new": True, "only_failed": True},
+        )
+        assert response.status_code == 422
+
 
 class TestTriggerDagRun:
     def _dags_for_trigger_tests(self, session=None):
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py 
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index b492406604c..3e158bbf739 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -287,6 +287,13 @@ class DAGRunClearBody(BaseModel):
     )
     dry_run: Annotated[bool | None, Field(title="Dry Run")] = True
     only_failed: Annotated[bool | None, Field(title="Only Failed")] = False
+    only_new: Annotated[
+        bool | None,
+        Field(
+            description="Only queue newly added tasks in the latest DAG 
version without clearing existing tasks.",
+            title="Only New",
+        ),
+    ] = False
     run_on_latest_version: Annotated[
         bool | None,
         Field(
@@ -638,6 +645,15 @@ class MaterializeAssetBody(BaseModel):
     partition_key: Annotated[str | None, Field(title="Partition Key")] = None
 
 
+class NewTaskResponse(BaseModel):
+    """
+    Lightweight response for new tasks that don't have TaskInstances yet.
+    """
+
+    task_id: Annotated[str, Field(title="Task Id")]
+    task_display_name: Annotated[str, Field(title="Task Display Name")]
+
+
 class PluginImportErrorResponse(BaseModel):
     """
     Plugin Import Error serializer for responses.
@@ -1932,6 +1948,15 @@ class BulkDeleteActionBulkTaskInstanceBody(BaseModel):
     action_on_non_existence: BulkActionNotOnExistence | None = "fail"
 
 
+class ClearTaskInstanceCollectionResponse(BaseModel):
+    """
+    Response for clear dag run dry run, which may contain new tasks without 
full TaskInstance data.
+    """
+
+    task_instances: Annotated[list[TaskInstanceResponse | NewTaskResponse], 
Field(title="Task Instances")]
+    total_entries: Annotated[int, Field(title="Total Entries")]
+
+
 class DAGCollectionResponse(BaseModel):
     """
     Dag Collection serializer for responses.

Reply via email to