pierrejeambrun commented on code in PR #60161:
URL: https://github.com/apache/airflow/pull/60161#discussion_r2746477780
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -919,13 +997,14 @@ def bulk_task_instances(
user: GetUserDep,
) -> BulkResponse:
"""Bulk update, and delete task instances."""
- return BulkTaskInstanceService(
+ service = BulkTaskInstanceService(
session=session, request=request, dag_id=dag_id,
dag_run_id=dag_run_id, dag_bag=dag_bag, user=user
- ).handle_request()
+ )
+ return service.handle_request()
Review Comment:
This change doesn't seem related and should be removed.
##########
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py:
##########
@@ -906,6 +964,26 @@ def patch_task_instance_dry_run(
)
+@task_instances_router.patch(
+ task_instances_prefix + "/dry_run",
+ dependencies=[Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.TASK_INSTANCE))],
+ operation_id="bulk_task_instances_dry_run",
+)
+def bulk_task_instances_dry_run(
+ request: BulkBody[BulkTaskInstanceBody],
+ session: SessionDep,
+ dag_id: str,
+ dag_bag: DagBagDep,
+ dag_run_id: str,
+ user: GetUserDep,
+) -> TaskInstanceCollectionResponse:
+ """Bulk update task instances dry run - returns affected task instances
without making changes."""
+ service = BulkTaskInstanceService(
+ session=session, request=request, dag_id=dag_id,
dag_run_id=dag_run_id, dag_bag=dag_bag, user=user
+ )
+ return service.handle_request_dry_run()
Review Comment:
This change deosn't seem related and should be removed.
##########
airflow-core/src/airflow/ui/src/components/MarkAs/TaskInstance/MarkGroupTaskInstanceAsDialog.tsx:
##########
@@ -0,0 +1,168 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { Button, Flex, Heading, VStack } from "@chakra-ui/react";
+import { useState } from "react";
+import { useTranslation } from "react-i18next";
+import { useParams } from "react-router-dom";
+
+import type { LightGridTaskInstanceSummary, TaskInstanceState } from
"openapi/requests/types.gen";
+import { ActionAccordion } from "src/components/ActionAccordion";
+import { StateBadge } from "src/components/StateBadge";
+import Time from "src/components/Time";
+import { Dialog } from "src/components/ui";
+import SegmentedControl from "src/components/ui/SegmentedControl";
+import { usePatchTaskGroup } from "src/queries/usePatchTaskGroup";
+import { usePatchTaskGroupDryRun } from "src/queries/usePatchTaskGroupDryRun";
+
+type Props = {
+ readonly groupTaskInstance: LightGridTaskInstanceSummary;
+ readonly onClose: () => void;
+ readonly open: boolean;
+ readonly state: TaskInstanceState;
+};
+
+const MarkGroupTaskInstanceAsDialog = ({ groupTaskInstance, onClose, open,
state }: Props) => {
+ const { t: translate } = useTranslation();
+ const { dagId = "", runId = "" } = useParams();
+ const groupId = groupTaskInstance.task_id;
+
+ const [selectedOptions, setSelectedOptions] = useState<Array<string>>([]);
+
+ const past = selectedOptions.includes("past");
+ const future = selectedOptions.includes("future");
+ const upstream = selectedOptions.includes("upstream");
+ const downstream = selectedOptions.includes("downstream");
+
+ // eslint-disable-next-line unicorn/no-null -- DAGRunResponse["note"] type
requires null, not undefined
+ const [note, setNote] = useState<string | null>(null);
Review Comment:
Initialise with `""` and replace by null at the backend call so you don't
need that `eslint-disable-next-line unicorn/no-null`.
Check other calls for [note, setNote] in the code base we never have to
ignore this.
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -5133,6 +5133,86 @@ def
test_should_return_empty_list_for_updating_same_task_instance_state(
assert response.json() == {"task_instances": [], "total_entries": 0}
+class TestPatchTaskInstanceByTaskGroupId(TestTaskInstanceEndpoint):
Review Comment:
If possible Merge this into `TestPatchTaskInstance`. 1 test class = 1
endpoint. It's the same test endpoint with different parameter.
##########
airflow-core/src/airflow/ui/src/queries/useBulkUpdateTaskInstances.ts:
##########
@@ -0,0 +1,78 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+import { useQueryClient } from "@tanstack/react-query";
+import { useTranslation } from "react-i18next";
+
+import {
+ useTaskInstanceServiceBulkTaskInstances,
+ useTaskInstanceServiceGetTaskInstancesKey,
+ UseGridServiceGetGridRunsKeyFn,
+ UseGridServiceGetGridTiSummariesKeyFn,
+ useGridServiceGetGridTiSummariesKey,
+} from "openapi/queries";
+import { toaster } from "src/components/ui";
+
+type Props = {
+ readonly affectsMultipleRuns?: boolean;
+ readonly dagId: string;
+ readonly dagRunId: string;
+ readonly onSuccess?: () => void;
+};
+
+export const useBulkUpdateTaskInstances = ({
Review Comment:
What is that for ?
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -5133,6 +5133,86 @@ def
test_should_return_empty_list_for_updating_same_task_instance_state(
assert response.json() == {"task_instances": [], "total_entries": 0}
+class TestPatchTaskInstanceByTaskGroupId(TestTaskInstanceEndpoint):
+ """Tests for patch_task_instance with task_group_id query parameter."""
+
+ DAG_ID = "example_task_group"
+ RUN_ID = "TEST_DAG_RUN_ID"
+ TASK_GROUP_ID = "section_1"
+ # We need a task_id as path parameter, using start task from
example_task_group DAG
+ TASK_ID = "start"
+ NEW_STATE = "failed"
+
+ def test_should_update_task_group_instances_state(self, test_client,
session):
Review Comment:
We probably need to add test for updating the 'note'. All tis should have
the same note. It would be confusing for users to have the 'note' work only for
single TI and not for groups.
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -5133,6 +5133,86 @@ def
test_should_return_empty_list_for_updating_same_task_instance_state(
assert response.json() == {"task_instances": [], "total_entries": 0}
+class TestPatchTaskInstanceByTaskGroupId(TestTaskInstanceEndpoint):
+ """Tests for patch_task_instance with task_group_id query parameter."""
+
+ DAG_ID = "example_task_group"
+ RUN_ID = "TEST_DAG_RUN_ID"
+ TASK_GROUP_ID = "section_1"
+ # We need a task_id as path parameter, using start task from
example_task_group DAG
+ TASK_ID = "start"
+ NEW_STATE = "failed"
+
+ def test_should_update_task_group_instances_state(self, test_client,
session):
+ """Test updating all task instances in a task group using
task_group_id query param."""
+ self.create_task_instances(session, dag_id=self.DAG_ID)
+
+ response = test_client.patch(
+
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}",
+ params={"task_group_id": self.TASK_GROUP_ID},
+ json={"new_state": self.NEW_STATE},
+ )
+ assert response.status_code == 200, response.text
+ response_data = response.json()
+ # section_1 has 3 tasks: task_1, task_2, task_3
+ assert response_data["total_entries"] == 3
+ for ti in response_data["task_instances"]:
+ assert ti["state"] == self.NEW_STATE
+
+ def test_should_raise_404_for_non_existent_task_group(self, test_client,
session):
+ """Test that non-existent task group returns 404."""
+ self.create_task_instances(session, dag_id=self.DAG_ID)
+
+ response = test_client.patch(
+
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}",
+ params={"task_group_id": "non_existent_group"},
+ json={"new_state": self.NEW_STATE},
+ )
+ assert response.status_code == 404
+ assert "not found in DAG" in response.text
+
+ def test_should_update_nested_task_group_instances_state(self,
test_client, session):
+ """Test updating task instances in a nested task group."""
+ self.create_task_instances(session, dag_id=self.DAG_ID)
+
+ # section_2 has task_1 + inner_section_2 (task_2, task_3, task_4) = 4
tasks
+ response = test_client.patch(
+
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}",
+ params={"task_group_id": "section_2"},
+ json={"new_state": self.NEW_STATE},
+ )
+ assert response.status_code == 200, response.text
+ response_data = response.json()
+ assert response_data["total_entries"] == 4
Review Comment:
Assert Task id ans state of the response tasks
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -5133,6 +5133,86 @@ def
test_should_return_empty_list_for_updating_same_task_instance_state(
assert response.json() == {"task_instances": [], "total_entries": 0}
+class TestPatchTaskInstanceByTaskGroupId(TestTaskInstanceEndpoint):
+ """Tests for patch_task_instance with task_group_id query parameter."""
+
+ DAG_ID = "example_task_group"
+ RUN_ID = "TEST_DAG_RUN_ID"
+ TASK_GROUP_ID = "section_1"
+ # We need a task_id as path parameter, using start task from
example_task_group DAG
+ TASK_ID = "start"
+ NEW_STATE = "failed"
+
+ def test_should_update_task_group_instances_state(self, test_client,
session):
+ """Test updating all task instances in a task group using
task_group_id query param."""
+ self.create_task_instances(session, dag_id=self.DAG_ID)
+
+ response = test_client.patch(
+
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}",
+ params={"task_group_id": self.TASK_GROUP_ID},
+ json={"new_state": self.NEW_STATE},
+ )
+ assert response.status_code == 200, response.text
+ response_data = response.json()
+ # section_1 has 3 tasks: task_1, task_2, task_3
+ assert response_data["total_entries"] == 3
+ for ti in response_data["task_instances"]:
+ assert ti["state"] == self.NEW_STATE
+
+ def test_should_raise_404_for_non_existent_task_group(self, test_client,
session):
+ """Test that non-existent task group returns 404."""
+ self.create_task_instances(session, dag_id=self.DAG_ID)
+
+ response = test_client.patch(
+
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}",
+ params={"task_group_id": "non_existent_group"},
+ json={"new_state": self.NEW_STATE},
+ )
+ assert response.status_code == 404
+ assert "not found in DAG" in response.text
+
+ def test_should_update_nested_task_group_instances_state(self,
test_client, session):
+ """Test updating task instances in a nested task group."""
+ self.create_task_instances(session, dag_id=self.DAG_ID)
+
+ # section_2 has task_1 + inner_section_2 (task_2, task_3, task_4) = 4
tasks
+ response = test_client.patch(
+
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}",
+ params={"task_group_id": "section_2"},
+ json={"new_state": self.NEW_STATE},
+ )
+ assert response.status_code == 200, response.text
+ response_data = response.json()
+ assert response_data["total_entries"] == 4
+
+ def test_dry_run_should_return_affected_task_instances(self, test_client,
session):
+ """Test dry run returns affected task instances without making
changes."""
+ self.create_task_instances(session, dag_id=self.DAG_ID)
+
+ response = test_client.patch(
+
f"/dags/{self.DAG_ID}/dagRuns/{self.RUN_ID}/taskInstances/{self.TASK_ID}/dry_run",
+ params={"task_group_id": self.TASK_GROUP_ID},
+ json={"new_state": self.NEW_STATE},
+ )
+ assert response.status_code == 200, response.text
+ response_data = response.json()
+ # section_1 has 3 tasks
+ assert response_data["total_entries"] == 3
+
+ # Verify task_ids belong to section_1
+ for ti in response_data["task_instances"]:
+ assert ti["task_id"].startswith("section_1.")
Review Comment:
Merge this with the previous test
##########
airflow-core/src/airflow/ui/src/queries/useBulkUpdateTaskInstancesDryRun.ts:
##########
@@ -0,0 +1,65 @@
+/*!
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
Review Comment:
What is this file for? Seems unrelated.
##########
airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py:
##########
@@ -5871,3 +5951,209 @@ def test_should_respond_403(self,
unauthorized_test_client):
def test_should_respond_422(self, test_client):
response = test_client.patch(self.ENDPOINT_URL, json={})
assert response.status_code == 422
+
+
+class TestBulkTaskInstancesDryRun(TestTaskInstanceEndpoint):
Review Comment:
Bulk is not appropriate, it qualifies the bulk update endpoint which is
another endpoint, That's `TestPatchTaskInstanceDryRunByTaskGroupId` And it
should probably be merged into the existing class for testing
`TestPatchTaskInstanceDryRun` that already exist
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]