This is an automated email from the ASF dual-hosted git repository.
bbovenzi pushed a commit to branch v3-2-test
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-2-test by this push:
new f0ec21a364b feat: enable queue up new tasks (#63484) (#66869)
f0ec21a364b is described below
commit f0ec21a364b3ade2ba3e6131346f2e528f63a884
Author: Rahul Vats <[email protected]>
AuthorDate: Sat May 16 00:40:12 2026 +0530
feat: enable queue up new tasks (#63484) (#66869)
* feat: enable queue up new tasks
* chore: remove redundant disable flag from button
* feat: create and pass NewTaskCollectionResponse
* cleanup: unused translations
* fix: remove additional return type and simplify code
* remove redundancy
* fix: remove unused var
* fix: mypy and tests
* fix: more mypy
* cleanup
* fix: mypy
* fix: pnpm lint
* cleanup
(cherry picked from commit dae48ba722c420010de2c831587d005923644fc3)
Co-authored-by: Oscar Ligthart <[email protected]>
---
.../api_fastapi/core_api/datamodels/dag_run.py | 14 +++-
.../core_api/datamodels/task_instances.py | 31 ++++++++
.../core_api/openapi/v2-rest-api-generated.yaml | 42 ++++++++++-
.../api_fastapi/core_api/routes/public/dag_run.py | 31 +++++---
airflow-core/src/airflow/models/taskinstance.py | 9 ++-
.../src/airflow/serialization/definitions/dag.py | 17 +++++
.../airflow/ui/openapi-gen/requests/schemas.gen.ts | 50 +++++++++++++
.../airflow/ui/openapi-gen/requests/types.gen.ts | 24 ++++++-
.../ui/src/components/Clear/Run/ClearRunDialog.tsx | 29 +++++---
.../airflow/ui/src/queries/useClearDagRunDryRun.ts | 7 +-
.../core_api/routes/public/test_dag_run.py | 84 ++++++++++++++++++++++
.../src/airflowctl/api/datamodels/generated.py | 25 +++++++
12 files changed, 337 insertions(+), 26 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
index e88aad3d6d1..510015a9193 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/dag_run.py
@@ -20,7 +20,7 @@ from __future__ import annotations
from collections.abc import Iterable
from datetime import datetime
from enum import Enum
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Any
from pydantic import AliasPath, AwareDatetime, Field, NonNegativeInt,
model_validator
@@ -55,11 +55,23 @@ class DAGRunClearBody(StrictBaseModel):
dry_run: bool = True
only_failed: bool = False
+ only_new: bool = Field(
+ default=False,
+ description="Only queue newly added tasks in the latest DAG version
without clearing existing tasks.",
+ )
run_on_latest_version: bool = Field(
default=False,
description="(Experimental) Run on the latest bundle version of the
Dag after clearing the Dag Run.",
)
+ @model_validator(mode="before")
+ @classmethod
+ def validate_model(cls, data: Any) -> Any:
+ """Validate clear DAG run form."""
+ if data.get("only_new") and data.get("only_failed"):
+ raise ValueError("only_new and only_failed are mutually exclusive")
+ return data
+
class DAGRunResponse(BaseModel):
"""Dag Run serializer for responses."""
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
index 979218f3151..ae0e6c31f47 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/task_instances.py
@@ -25,9 +25,11 @@ from pydantic import (
AliasPath,
AwareDatetime,
BeforeValidator,
+ Discriminator,
Field,
NonNegativeInt,
StringConstraints,
+ Tag,
ValidationError,
field_validator,
model_validator,
@@ -40,6 +42,13 @@ from airflow.api_fastapi.core_api.datamodels.trigger import
TriggerResponse
from airflow.utils.state import TaskInstanceState
+class NewTaskResponse(BaseModel):
+ """Lightweight response for new tasks that don't have TaskInstances yet."""
+
+ task_id: str
+ task_display_name: str
+
+
class TaskInstanceResponse(BaseModel):
"""TaskInstance serializer for responses."""
@@ -112,6 +121,28 @@ class TaskInstanceCollectionResponse(BaseModel):
)
+def _task_instance_discriminator(v: Any) -> str:
+ """Discriminate between TaskInstanceResponse and NewTaskResponse in the
union."""
+ if isinstance(v, NewTaskResponse):
+ return "new"
+ if isinstance(v, dict):
+ return "new" if "id" not in v else "full"
+ # ORM objects and TaskInstanceResponse instances
+ return "full"
+
+
+class ClearTaskInstanceCollectionResponse(BaseModel):
+ """Response for clear dag run dry run, which may contain new tasks without
full TaskInstance data."""
+
+ task_instances: Iterable[
+ Annotated[
+ Annotated[TaskInstanceResponse, Tag("full")] |
Annotated[NewTaskResponse, Tag("new")],
+ Discriminator(_task_instance_discriminator),
+ ]
+ ]
+ total_entries: int
+
+
class TaskDependencyResponse(BaseModel):
"""Task Dependency serializer for responses."""
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
index 767cdfb29d7..9d8b59ecaea 100644
---
a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
+++
b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml
@@ -2142,7 +2142,7 @@ paths:
application/json:
schema:
anyOf:
- - $ref: '#/components/schemas/TaskInstanceCollectionResponse'
+ - $ref:
'#/components/schemas/ClearTaskInstanceCollectionResponse'
- $ref: '#/components/schemas/DAGRunResponse'
title: Response Clear Dag Run
'401':
@@ -11072,6 +11072,25 @@ components:
- action
- entities
title: BulkUpdateAction[VariableBody]
+ ClearTaskInstanceCollectionResponse:
+ properties:
+ task_instances:
+ items:
+ oneOf:
+ - $ref: '#/components/schemas/TaskInstanceResponse'
+ - $ref: '#/components/schemas/NewTaskResponse'
+ type: array
+ title: Task Instances
+ total_entries:
+ type: integer
+ title: Total Entries
+ type: object
+ required:
+ - task_instances
+ - total_entries
+ title: ClearTaskInstanceCollectionResponse
+ description: Response for clear dag run dry run, which may contain new
tasks
+ without full TaskInstance data.
ClearTaskInstancesBody:
properties:
dry_run:
@@ -11852,6 +11871,12 @@ components:
type: boolean
title: Only Failed
default: false
+ only_new:
+ type: boolean
+ title: Only New
+ description: Only queue newly added tasks in the latest DAG version
without
+ clearing existing tasks.
+ default: false
run_on_latest_version:
type: boolean
title: Run On Latest Version
@@ -13271,6 +13296,21 @@ components:
type: object
title: MaterializeAssetBody
description: Materialize asset request.
+ NewTaskResponse:
+ properties:
+ task_id:
+ type: string
+ title: Task Id
+ task_display_name:
+ type: string
+ title: Task Display Name
+ type: object
+ required:
+ - task_id
+ - task_display_name
+ title: NewTaskResponse
+ description: Lightweight response for new tasks that don't have
TaskInstances
+ yet.
PatchTaskInstanceBody:
properties:
new_state:
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
index e168bd782f3..5e33c844aeb 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py
@@ -85,7 +85,8 @@ from airflow.api_fastapi.core_api.datamodels.dag_run import (
TriggerDAGRunPostBody,
)
from airflow.api_fastapi.core_api.datamodels.task_instances import (
- TaskInstanceCollectionResponse,
+ ClearTaskInstanceCollectionResponse,
+ NewTaskResponse,
TaskInstanceResponse,
)
from airflow.api_fastapi.core_api.openapi.exceptions import
create_openapi_http_exception_doc
@@ -296,7 +297,7 @@ def clear_dag_run(
body: DAGRunClearBody,
dag_bag: DagBagDep,
session: SessionDep,
-) -> TaskInstanceCollectionResponse | DAGRunResponse:
+) -> ClearTaskInstanceCollectionResponse | DAGRunResponse:
dag_run = session.scalar(
select(DagRun).filter_by(dag_id=dag_id,
run_id=dag_run_id).options(joinedload(DagRun.dag_model))
)
@@ -308,27 +309,39 @@ def clear_dag_run(
dag = dag_bag.get_dag_for_run(dag_run, session=session)
+ if not dag:
+ raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
+
if body.dry_run:
- if not dag:
- raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id
{dag_id} was not found")
- task_instances = dag.clear(
+ task_instances_or_ids = dag.clear(
run_id=dag_run_id,
task_ids=None,
+ only_new=body.only_new,
only_failed=body.only_failed,
run_on_latest_version=body.run_on_latest_version,
dry_run=True,
session=session,
)
- return TaskInstanceCollectionResponse(
- task_instances=cast("list[TaskInstanceResponse]", task_instances),
+ if body.only_new:
+ # Create lightweight NewTaskResponse objects for new tasks
+ new_task_ids = cast("set[str]", task_instances_or_ids)
+ task_instances: list[TaskInstanceResponse | NewTaskResponse] = [
+ NewTaskResponse(task_id=task_id, task_display_name=task_id)
+ for task_id in sorted(new_task_ids)
+ ]
+ else:
+ task_instances = cast("list[TaskInstanceResponse |
NewTaskResponse]", task_instances_or_ids)
+
+ return ClearTaskInstanceCollectionResponse(
+ task_instances=task_instances,
total_entries=len(task_instances),
)
- if not dag:
- raise HTTPException(status.HTTP_404_NOT_FOUND, f"Dag with id {dag_id}
was not found")
+
dag.clear(
run_id=dag_run_id,
task_ids=None,
+ only_new=body.only_new,
only_failed=body.only_failed,
run_on_latest_version=body.run_on_latest_version,
session=session,
diff --git a/airflow-core/src/airflow/models/taskinstance.py
b/airflow-core/src/airflow/models/taskinstance.py
index 15b8d796443..3e2cf88beaf 100644
--- a/airflow-core/src/airflow/models/taskinstance.py
+++ b/airflow-core/src/airflow/models/taskinstance.py
@@ -262,7 +262,14 @@ def _get_new_task_ids(
if not latest_dag:
raise ValueError(f"Latest DAG version for '{dag_id}' not found")
- current_dag = scheduler_dagbag.get_dag_for_run(dag_run=dag_run,
session=session)
+ # Use created_dag_version_id directly to get the DAG version the run was
+ # originally created with. We cannot use get_dag_for_run here because it
+ # falls back to the latest version when bundle_version is not set (e.g.
+ # LocalDagBundle), which would make current_dag == latest_dag and the diff
+ # always empty.
+ current_dag = None
+ if dag_run.created_dag_version_id:
+ current_dag =
scheduler_dagbag.get_dag(version_id=dag_run.created_dag_version_id,
session=session)
new_task_ids = set(latest_dag.task_ids) - set(current_dag.task_ids) if
current_dag else set()
return list(new_task_ids)
diff --git a/airflow-core/src/airflow/serialization/definitions/dag.py
b/airflow-core/src/airflow/serialization/definitions/dag.py
index 8844a18412c..a1589f62995 100644
--- a/airflow-core/src/airflow/serialization/definitions/dag.py
+++ b/airflow-core/src/airflow/serialization/definitions/dag.py
@@ -944,6 +944,23 @@ class SerializedDAG:
run_on_latest_version: bool = False,
) -> list[TaskInstance]: ... # pragma: no cover
+ @overload
+ def clear(
+ self,
+ *,
+ dry_run: Literal[True],
+ task_ids: Collection[str | tuple[str, int]] | None = None,
+ run_id: str,
+ only_failed: bool = False,
+ only_running: bool = False,
+ only_new: bool,
+ dag_run_state: DagRunState = DagRunState.QUEUED,
+ session: Session = NEW_SESSION,
+ exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None =
frozenset(),
+ exclude_run_ids: frozenset[str] | None = frozenset(),
+ run_on_latest_version: bool = False,
+ ) -> set[str] | list[TaskInstance]: ... # pragma: no cover
+
@overload
def clear(
self,
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
index 8cb0d297901..b2b7beb5418 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
@@ -1270,6 +1270,33 @@ export const $BulkUpdateAction_VariableBody_ = {
title: 'BulkUpdateAction[VariableBody]'
} as const;
+export const $ClearTaskInstanceCollectionResponse = {
+ properties: {
+ task_instances: {
+ items: {
+ oneOf: [
+ {
+ '$ref': '#/components/schemas/TaskInstanceResponse'
+ },
+ {
+ '$ref': '#/components/schemas/NewTaskResponse'
+ }
+ ]
+ },
+ type: 'array',
+ title: 'Task Instances'
+ },
+ total_entries: {
+ type: 'integer',
+ title: 'Total Entries'
+ }
+ },
+ type: 'object',
+ required: ['task_instances', 'total_entries'],
+ title: 'ClearTaskInstanceCollectionResponse',
+ description: 'Response for clear dag run dry run, which may contain new
tasks without full TaskInstance data.'
+} as const;
+
export const $ClearTaskInstancesBody = {
properties: {
dry_run: {
@@ -2467,6 +2494,12 @@ export const $DAGRunClearBody = {
title: 'Only Failed',
default: false
},
+ only_new: {
+ type: 'boolean',
+ title: 'Only New',
+ description: 'Only queue newly added tasks in the latest DAG
version without clearing existing tasks.',
+ default: false
+ },
run_on_latest_version: {
type: 'boolean',
title: 'Run On Latest Version',
@@ -4608,6 +4641,23 @@ export const $MaterializeAssetBody = {
description: 'Materialize asset request.'
} as const;
+export const $NewTaskResponse = {
+ properties: {
+ task_id: {
+ type: 'string',
+ title: 'Task Id'
+ },
+ task_display_name: {
+ type: 'string',
+ title: 'Task Display Name'
+ }
+ },
+ type: 'object',
+ required: ['task_id', 'task_display_name'],
+ title: 'NewTaskResponse',
+ description: "Lightweight response for new tasks that don't have
TaskInstances yet."
+} as const;
+
export const $PatchTaskInstanceBody = {
properties: {
new_state: {
diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
index 7bac12137b6..cca921b6bef 100644
--- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
+++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
@@ -400,6 +400,14 @@ export type BulkUpdateAction_VariableBody_ = {
action_on_non_existence?: BulkActionNotOnExistence;
};
+/**
+ * Response for clear dag run dry run, which may contain new tasks without
full TaskInstance data.
+ */
+export type ClearTaskInstanceCollectionResponse = {
+ task_instances: Array<(TaskInstanceResponse | NewTaskResponse)>;
+ total_entries: number;
+};
+
/**
* Request body for Clear Task Instances endpoint.
*/
@@ -644,6 +652,10 @@ export type DAGResponse = {
export type DAGRunClearBody = {
dry_run?: boolean;
only_failed?: boolean;
+ /**
+ * Only queue newly added tasks in the latest DAG version without clearing
existing tasks.
+ */
+ only_new?: boolean;
/**
* (Experimental) Run on the latest bundle version of the Dag after
clearing the Dag Run.
*/
@@ -1167,6 +1179,14 @@ export type MaterializeAssetBody = {
partition_key?: string | null;
};
+/**
+ * Lightweight response for new tasks that don't have TaskInstances yet.
+ */
+export type NewTaskResponse = {
+ task_id: string;
+ task_display_name: string;
+};
+
/**
* Request body for Clear Task Instances endpoint.
*/
@@ -2626,7 +2646,7 @@ export type ClearDagRunData = {
requestBody: DAGRunClearBody;
};
-export type ClearDagRunResponse = TaskInstanceCollectionResponse |
DAGRunResponse;
+export type ClearDagRunResponse = ClearTaskInstanceCollectionResponse |
DAGRunResponse;
export type GetDagRunsData = {
bundleVersion?: string | null;
@@ -4947,7 +4967,7 @@ export type $OpenApiTs = {
/**
* Successful Response
*/
- 200: TaskInstanceCollectionResponse | DAGRunResponse;
+ 200: ClearTaskInstanceCollectionResponse | DAGRunResponse;
/**
* Unauthorized
*/
diff --git
a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
index a4b53c10428..6c36b478495 100644
--- a/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
+++ b/airflow-core/src/airflow/ui/src/components/Clear/Run/ClearRunDialog.tsx
@@ -22,7 +22,7 @@ import { useTranslation } from "react-i18next";
import { CgRedo } from "react-icons/cg";
import { useDagServiceGetDagDetails } from "openapi/queries";
-import type { DAGRunResponse, TaskInstanceResponse } from
"openapi/requests/types.gen";
+import type { DAGRunResponse } from "openapi/requests/types.gen";
import { ActionAccordion } from "src/components/ActionAccordion";
import { Checkbox, Dialog } from "src/components/ui";
import SegmentedControl from "src/components/ui/SegmentedControl";
@@ -45,6 +45,7 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props) => {
const [note, setNote] = useState<string | null>(dagRun.note);
const [selectedOptions, setSelectedOptions] =
useState<Array<string>>(["existingTasks"]);
const onlyFailed = selectedOptions.includes("onlyFailed");
+ const onlyNew = selectedOptions.includes("newTasks");
const [runOnLatestVersion, setRunOnLatestVersion] = useState(false);
// Get current DAG's bundle version to compare with DAG run's bundle version
@@ -59,11 +60,15 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props)
=> {
dagRunId,
options: {
refetchInterval: (query) =>
- query.state.data?.task_instances.some((ti: TaskInstanceResponse) =>
isStatePending(ti.state))
+ query.state.data?.task_instances.some((ti) => "state" in ti &&
isStatePending(ti.state))
? refetchInterval
: false,
},
- requestBody: { only_failed: onlyFailed, run_on_latest_version:
runOnLatestVersion },
+ requestBody: {
+ only_failed: onlyFailed,
+ only_new: onlyNew,
+ run_on_latest_version: runOnLatestVersion,
+ },
});
const { isPending, mutate } = useClearDagRun({
@@ -78,12 +83,14 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props)
=> {
onSuccess: onClose,
});
- // Check if bundle versions are different
- const currentDagBundleVersion = dagDetails?.bundle_version;
- const dagRunBundleVersion = dagRun.bundle_version;
- const bundleVersionsDiffer = currentDagBundleVersion !== dagRunBundleVersion;
- const shouldShowBundleVersionOption =
- bundleVersionsDiffer && dagRunBundleVersion !== null &&
dagRunBundleVersion !== "";
+ // Check if DAG versions differ (works for both bundle-versioned and local
bundles)
+ const latestDagVersionNumber =
dagDetails?.latest_dag_version?.version_number;
+ const dagRunVersionNumber = dagRun.dag_versions.at(-1)?.version_number;
+ const versionsDiffer =
+ latestDagVersionNumber !== undefined &&
+ dagRunVersionNumber !== undefined &&
+ latestDagVersionNumber !== dagRunVersionNumber;
+ const shouldShowBundleVersionOption = versionsDiffer && !onlyNew;
return (
<Dialog.Root lazyMount onOpenChange={onClose} open={open} size="xl">
@@ -116,9 +123,8 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props)
=> {
value: "onlyFailed",
},
{
- disabled: true,
label: translate("dags:runAndTaskActions.options.queueNew"),
- value: "new_tasks",
+ value: "newTasks",
},
]}
/>
@@ -148,6 +154,7 @@ const ClearRunDialog = ({ dagRun, onClose, open }: Props)
=> {
requestBody: {
dry_run: false,
only_failed: onlyFailed,
+ only_new: onlyNew,
run_on_latest_version: runOnLatestVersion,
},
});
diff --git a/airflow-core/src/airflow/ui/src/queries/useClearDagRunDryRun.ts
b/airflow-core/src/airflow/ui/src/queries/useClearDagRunDryRun.ts
index 2109df3d081..2e752395310 100644
--- a/airflow-core/src/airflow/ui/src/queries/useClearDagRunDryRun.ts
+++ b/airflow-core/src/airflow/ui/src/queries/useClearDagRunDryRun.ts
@@ -47,5 +47,10 @@ export const useClearDagRunDryRun = <TData =
TaskInstanceCollectionResponse, TEr
...requestBody,
},
}) as TData,
- queryKey: [useClearDagRunDryRunKey, dagId, dagRunId, { only_failed:
requestBody.only_failed }],
+ queryKey: [
+ useClearDagRunDryRunKey,
+ dagId,
+ dagRunId,
+ { only_failed: requestBody.only_failed, only_new: requestBody.only_new },
+ ],
});
diff --git
a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
index 50dbf0190a4..34ad144406e 100644
--- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
+++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_run.py
@@ -1729,6 +1729,90 @@ class TestClearDagRun:
assert body["detail"][0]["msg"] == "Field required"
assert body["detail"][0]["loc"][0] == "body"
+ @mock.patch("airflow.serialization.definitions.dag.SerializedDAG.clear")
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_clear_dag_run_only_new_dry_run(self, mock_clear, test_client,
session):
+ """Test that only_new dry_run returns placeholder task instances for
new tasks."""
+ mock_clear.return_value = {"new_task_1", "new_task_2", "new_task_3"}
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
+ json={"dry_run": True, "only_new": True},
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["total_entries"] == 3
+ # Verify new tasks are returned with correct task_ids in task_instances
+ task_ids = sorted(t["task_id"] for t in body["task_instances"])
+ assert task_ids == ["new_task_1", "new_task_2", "new_task_3"]
+ # Verify task_display_name defaults to task_id
+ for task in body["task_instances"]:
+ assert task["task_display_name"] == task["task_id"]
+ mock_clear.assert_called_once_with(
+ run_id=DAG1_RUN1_ID,
+ task_ids=None,
+ only_new=True,
+ only_failed=False,
+ run_on_latest_version=False,
+ dry_run=True,
+ session=mock.ANY,
+ )
+ logs = session.scalar(
+ select(func.count())
+ .select_from(Log)
+ .where(Log.dag_id == DAG1_ID, Log.run_id == DAG1_RUN1_ID,
Log.event == "clear_dag_run")
+ )
+ assert logs == 0
+
+ @mock.patch("airflow.serialization.definitions.dag.SerializedDAG.clear")
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_clear_dag_run_only_new_dry_run_no_new_tasks(self, mock_clear,
test_client, session):
+ """Test that only_new dry_run returns 0 total_entries when there are
no new tasks."""
+ mock_clear.return_value = set()
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
+ json={"dry_run": True, "only_new": True},
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["task_instances"] == []
+ assert body["total_entries"] == 0
+
+ @mock.patch("airflow.serialization.definitions.dag.SerializedDAG.clear")
+ @pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
+ def test_clear_dag_run_only_new_non_dry_run(self, mock_clear, test_client,
session):
+ """Test that only_new non-dry_run clears and returns a
DAGRunResponse."""
+ mock_clear.return_value = 2
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
+ json={"dry_run": False, "only_new": True},
+ )
+ assert response.status_code == 200
+ body = response.json()
+ assert body["dag_id"] == DAG1_ID
+ assert body["dag_run_id"] == DAG1_RUN1_ID
+ mock_clear.assert_called_once_with(
+ run_id=DAG1_RUN1_ID,
+ task_ids=None,
+ only_new=True,
+ only_failed=False,
+ run_on_latest_version=False,
+ session=mock.ANY,
+ )
+ _check_last_log(
+ session,
+ dag_id=DAG1_ID,
+ event="clear_dag_run",
+ logical_date=None,
+ )
+
+ def test_clear_dag_run_only_new_and_only_failed_mutually_exclusive(self,
test_client):
+ """Test that only_new and only_failed cannot both be True."""
+ response = test_client.post(
+ f"/dags/{DAG1_ID}/dagRuns/{DAG1_RUN1_ID}/clear",
+ json={"dry_run": True, "only_new": True, "only_failed": True},
+ )
+ assert response.status_code == 422
+
class TestTriggerDagRun:
def _dags_for_trigger_tests(self, session=None):
diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
index b492406604c..3e158bbf739 100644
--- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py
+++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py
@@ -287,6 +287,13 @@ class DAGRunClearBody(BaseModel):
)
dry_run: Annotated[bool | None, Field(title="Dry Run")] = True
only_failed: Annotated[bool | None, Field(title="Only Failed")] = False
+ only_new: Annotated[
+ bool | None,
+ Field(
+ description="Only queue newly added tasks in the latest DAG
version without clearing existing tasks.",
+ title="Only New",
+ ),
+ ] = False
run_on_latest_version: Annotated[
bool | None,
Field(
@@ -638,6 +645,15 @@ class MaterializeAssetBody(BaseModel):
partition_key: Annotated[str | None, Field(title="Partition Key")] = None
+class NewTaskResponse(BaseModel):
+ """
+ Lightweight response for new tasks that don't have TaskInstances yet.
+ """
+
+ task_id: Annotated[str, Field(title="Task Id")]
+ task_display_name: Annotated[str, Field(title="Task Display Name")]
+
+
class PluginImportErrorResponse(BaseModel):
"""
Plugin Import Error serializer for responses.
@@ -1932,6 +1948,15 @@ class BulkDeleteActionBulkTaskInstanceBody(BaseModel):
action_on_non_existence: BulkActionNotOnExistence | None = "fail"
+class ClearTaskInstanceCollectionResponse(BaseModel):
+ """
+ Response for clear dag run dry run, which may contain new tasks without
full TaskInstance data.
+ """
+
+ task_instances: Annotated[list[TaskInstanceResponse | NewTaskResponse],
Field(title="Task Instances")]
+ total_entries: Annotated[int, Field(title="Total Entries")]
+
+
class DAGCollectionResponse(BaseModel):
"""
Dag Collection serializer for responses.