This is an automated email from the ASF dual-hosted git repository. pierrejeambrun pushed a commit to branch v3-0-test in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/v3-0-test by this push: new fdc7067e043 [v3-0-test] Fix patch_task_instance endpoint (#50550) (#50591) fdc7067e043 is described below commit fdc7067e04302e44acb7c95575c31f74ce285b0c Author: Pierre Jeambrun <pierrejb...@gmail.com> AuthorDate: Wed May 14 15:49:30 2025 +0200 [v3-0-test] Fix patch_task_instance endpoint (#50550) (#50591) * Fix patch_task_instance endpoint (#50550) (cherry picked from commit db89cc0218f8df737410dbe2eddceaa91739ce49) * Fix CI --- .../core_api/openapi/v1-rest-api-generated.yaml | 22 +- .../core_api/routes/public/task_instances.py | 21 +- .../src/airflow/ui/openapi-gen/queries/queries.ts | 4 +- .../ui/openapi-gen/requests/services.gen.ts | 4 +- .../airflow/ui/openapi-gen/requests/types.gen.ts | 16 +- .../core_api/routes/public/test_task_instances.py | 360 +++++++++++---------- 6 files changed, 236 insertions(+), 191 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml index 4ea3c207c3f..01564724a81 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v1-rest-api-generated.yaml @@ -4484,8 +4484,9 @@ paths: in: query required: false schema: - type: integer - default: -1 + anyOf: + - type: integer + - type: 'null' title: Map Index - name: update_mask in: query @@ -4509,7 +4510,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/TaskInstanceResponse' + $ref: '#/components/schemas/TaskInstanceCollectionResponse' '401': content: application/json: @@ -5131,7 +5132,9 @@ paths: in: path required: true schema: - type: integer + anyOf: + - type: integer + - type: 'null' title: Map Index - name: update_mask in: query @@ -5155,7 +5158,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/TaskInstanceResponse' + $ref: '#/components/schemas/TaskInstanceCollectionResponse' '401': content: application/json: @@ -5724,7 +5727,9 @@ paths: in: path required: true schema: - type: integer + anyOf: + - type: integer + - type: 'null' title: Map Index - name: update_mask in: query @@ -5811,8 +5816,9 @@ paths: in: query required: false schema: - type: integer - default: -1 + anyOf: + - type: integer + - type: 'null' title: Map Index - name: update_mask in: query diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py index 65d8c6bc4d1..4a39921eedd 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_instances.py @@ -790,10 +790,13 @@ def patch_task_instance_dry_run( dag_bag: DagBagDep, body: PatchTaskInstanceBody, session: SessionDep, - map_index: int = -1, + map_index: int | None = None, update_mask: list[str] | None = Query(None), ) -> TaskInstanceCollectionResponse: """Update a task instance dry_run mode.""" + if map_index is None: + map_index = -1 + dag, ti, data = _patch_ti_validate_request( dag_id, dag_run_id, task_id, dag_bag, body, session, map_index, update_mask ) @@ -859,10 +862,13 @@ def patch_task_instance( body: PatchTaskInstanceBody, user: GetUserDep, session: SessionDep, - map_index: int = -1, + map_index: int | None = None, update_mask: list[str] | None = Query(None), -) -> TaskInstanceResponse: +) -> TaskInstanceCollectionResponse: """Update a task instance.""" + if map_index is None: + map_index = -1 + dag, ti, data = _patch_ti_validate_request( dag_id, dag_run_id, task_id, dag_bag, body, session, map_index, update_mask ) @@ -909,4 +915,11 @@ def patch_task_instance( ti.task_instance_note.user_id = user.get_id() session.commit() - return TaskInstanceResponse.model_validate(ti) + return TaskInstanceCollectionResponse( + task_instances=[ + TaskInstanceResponse.model_validate( + ti, + ) + ], + total_entries=1, + ) diff --git a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts index ae5dabdcd0d..92d6b15d7d8 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts @@ -4001,7 +4001,7 @@ export const useDagServicePatchDag = < * @param data.requestBody * @param data.mapIndex * @param data.updateMask - * @returns TaskInstanceResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ export const useTaskInstanceServicePatchTaskInstance = < @@ -4060,7 +4060,7 @@ export const useTaskInstanceServicePatchTaskInstance = < * @param data.mapIndex * @param data.requestBody * @param data.updateMask - * @returns TaskInstanceResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ export const useTaskInstanceServicePatchTaskInstance1 = < diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts index bfd0077783a..c4c6b7bb544 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/services.gen.ts @@ -1918,7 +1918,7 @@ export class TaskInstanceService { * @param data.requestBody * @param data.mapIndex * @param data.updateMask - * @returns TaskInstanceResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ public static patchTaskInstance(data: PatchTaskInstanceData): CancelablePromise<PatchTaskInstanceResponse> { @@ -2192,7 +2192,7 @@ export class TaskInstanceService { * @param data.mapIndex * @param data.requestBody * @param data.updateMask - * @returns TaskInstanceResponse Successful Response + * @returns TaskInstanceCollectionResponse Successful Response * @throws ApiError */ public static patchTaskInstance1( diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index d34e86cdb55..458383b693a 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -2197,13 +2197,13 @@ export type GetTaskInstanceResponse = TaskInstanceResponse; export type PatchTaskInstanceData = { dagId: string; dagRunId: string; - mapIndex?: number; + mapIndex?: number | null; requestBody: PatchTaskInstanceBody; taskId: string; updateMask?: Array<string> | null; }; -export type PatchTaskInstanceResponse = TaskInstanceResponse; +export type PatchTaskInstanceResponse = TaskInstanceCollectionResponse; export type GetMappedTaskInstancesData = { dagId: string; @@ -2281,13 +2281,13 @@ export type GetMappedTaskInstanceResponse = TaskInstanceResponse; export type PatchTaskInstance1Data = { dagId: string; dagRunId: string; - mapIndex: number; + mapIndex: number | null; requestBody: PatchTaskInstanceBody; taskId: string; updateMask?: Array<string> | null; }; -export type PatchTaskInstance1Response = TaskInstanceResponse; +export type PatchTaskInstance1Response = TaskInstanceCollectionResponse; export type GetTaskInstancesData = { dagId: string; @@ -2356,7 +2356,7 @@ export type PostClearTaskInstancesResponse = TaskInstanceCollectionResponse; export type PatchTaskInstanceDryRunData = { dagId: string; dagRunId: string; - mapIndex: number; + mapIndex: number | null; requestBody: PatchTaskInstanceBody; taskId: string; updateMask?: Array<string> | null; @@ -2367,7 +2367,7 @@ export type PatchTaskInstanceDryRunResponse = TaskInstanceCollectionResponse; export type PatchTaskInstanceDryRun1Data = { dagId: string; dagRunId: string; - mapIndex?: number; + mapIndex?: number | null; requestBody: PatchTaskInstanceBody; taskId: string; updateMask?: Array<string> | null; @@ -4198,7 +4198,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: TaskInstanceResponse; + 200: TaskInstanceCollectionResponse; /** * Bad Request */ @@ -4393,7 +4393,7 @@ export type $OpenApiTs = { /** * Successful Response */ - 200: TaskInstanceResponse; + 200: TaskInstanceCollectionResponse; /** * Bad Request */ diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py index 75c275b99cc..49769007fa8 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_task_instances.py @@ -3089,38 +3089,43 @@ class TestPatchTaskInstance(TestTaskInstanceEndpoint): ) assert response.status_code == 200 assert response.json() == { - "dag_id": self.DAG_ID, - "dag_version": None, - "dag_run_id": self.RUN_ID, - "logical_date": "2020-01-01T00:00:00Z", - "task_id": self.TASK_ID, - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "executor": None, - "executor_config": "{}", - "hostname": "", - "id": mock.ANY, - "map_index": -1, - "max_tries": 0, - "note": "placeholder-note", - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_display_name": self.TASK_ID, - "try_number": 0, - "unixname": getuser(), - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": self.DAG_ID, + "dag_version": None, + "dag_run_id": self.RUN_ID, + "logical_date": "2020-01-01T00:00:00Z", + "task_id": self.TASK_ID, + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "id": mock.ANY, + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_display_name": self.TASK_ID, + "try_number": 0, + "unixname": getuser(), + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, } mock_set_ti_state.assert_called_once_with( @@ -3305,38 +3310,43 @@ class TestPatchTaskInstance(TestTaskInstanceEndpoint): "failed", 200, { - "dag_id": "example_python_operator", - "dag_version": None, - "dag_run_id": "TEST_DAG_RUN_ID", - "logical_date": "2020-01-01T00:00:00Z", - "task_id": "print_the_context", - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "executor": None, - "executor_config": "{}", - "hostname": "", - "id": mock.ANY, - "map_index": -1, - "max_tries": 0, - "note": "placeholder-note", - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_display_name": "print_the_context", - "try_number": 0, - "unixname": getuser(), - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": "example_python_operator", + "dag_version": None, + "dag_run_id": "TEST_DAG_RUN_ID", + "logical_date": "2020-01-01T00:00:00Z", + "task_id": "print_the_context", + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "executor": None, + "executor_config": "{}", + "hostname": "", + "id": mock.ANY, + "map_index": -1, + "max_tries": 0, + "note": "placeholder-note", + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_display_name": "print_the_context", + "try_number": 0, + "unixname": getuser(), + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, }, 1, ), @@ -3416,40 +3426,45 @@ class TestPatchTaskInstance(TestTaskInstanceEndpoint): assert response.status_code == 200, response.text response_data = response.json() assert response_data == { - "dag_id": self.DAG_ID, - "dag_version": None, - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "logical_date": "2020-01-01T00:00:00Z", - "id": mock.ANY, - "executor": None, - "executor_config": "{}", - "hostname": "", - "map_index": -1, - "max_tries": 0, - "note": new_note_value, - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_id": self.TASK_ID, - "task_display_name": self.TASK_ID, - "try_number": 0, - "unixname": getuser(), - "dag_run_id": self.RUN_ID, - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": self.DAG_ID, + "dag_version": None, + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "logical_date": "2020-01-01T00:00:00Z", + "id": mock.ANY, + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "note": new_note_value, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_id": self.TASK_ID, + "task_display_name": self.TASK_ID, + "try_number": 0, + "unixname": getuser(), + "dag_run_id": self.RUN_ID, + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, } - _check_task_instance_note(session, response_data["id"], ti_note_data) + _check_task_instance_note(session, response_data["task_instances"][0]["id"], ti_note_data) def test_set_note_should_respond_200(self, test_client, session): self.create_task_instances(session) @@ -3461,42 +3476,47 @@ class TestPatchTaskInstance(TestTaskInstanceEndpoint): assert response.status_code == 200, response.text response_data = response.json() assert response_data == { - "dag_id": self.DAG_ID, - "dag_version": None, - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "logical_date": "2020-01-01T00:00:00Z", - "id": mock.ANY, - "executor": None, - "executor_config": "{}", - "hostname": "", - "map_index": -1, - "max_tries": 0, - "note": new_note_value, - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_id": self.TASK_ID, - "task_display_name": self.TASK_ID, - "try_number": 0, - "unixname": getuser(), - "dag_run_id": self.RUN_ID, - "rendered_fields": {}, - "rendered_map_index": None, - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": self.DAG_ID, + "dag_version": None, + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "logical_date": "2020-01-01T00:00:00Z", + "id": mock.ANY, + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": -1, + "max_tries": 0, + "note": new_note_value, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_id": self.TASK_ID, + "task_display_name": self.TASK_ID, + "try_number": 0, + "unixname": getuser(), + "dag_run_id": self.RUN_ID, + "rendered_fields": {}, + "rendered_map_index": None, + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, } _check_task_instance_note( - session, response_data["id"], {"content": new_note_value, "user_id": "test"} + session, response_data["task_instances"][0]["id"], {"content": new_note_value, "user_id": "test"} ) def test_set_note_should_respond_200_mapped_task_instance_with_rtif(self, test_client, session): @@ -3523,42 +3543,49 @@ class TestPatchTaskInstance(TestTaskInstanceEndpoint): response_data = response.json() assert response_data == { - "dag_id": self.DAG_ID, - "dag_version": None, - "duration": 10000.0, - "end_date": "2020-01-03T00:00:00Z", - "logical_date": "2020-01-01T00:00:00Z", - "id": mock.ANY, - "executor": None, - "executor_config": "{}", - "hostname": "", - "map_index": map_index, - "max_tries": 0, - "note": new_note_value, - "operator": "PythonOperator", - "pid": 100, - "pool": "default_pool", - "pool_slots": 1, - "priority_weight": 9, - "queue": "default_queue", - "queued_when": None, - "scheduled_when": None, - "start_date": "2020-01-02T00:00:00Z", - "state": "running", - "task_id": self.TASK_ID, - "task_display_name": self.TASK_ID, - "try_number": 0, - "unixname": getuser(), - "dag_run_id": self.RUN_ID, - "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, - "rendered_map_index": str(map_index), - "run_after": "2020-01-01T00:00:00Z", - "trigger": None, - "triggerer_job": None, + "task_instances": [ + { + "dag_id": self.DAG_ID, + "dag_version": None, + "duration": 10000.0, + "end_date": "2020-01-03T00:00:00Z", + "logical_date": "2020-01-01T00:00:00Z", + "id": mock.ANY, + "executor": None, + "executor_config": "{}", + "hostname": "", + "map_index": map_index, + "max_tries": 0, + "note": new_note_value, + "operator": "PythonOperator", + "pid": 100, + "pool": "default_pool", + "pool_slots": 1, + "priority_weight": 9, + "queue": "default_queue", + "queued_when": None, + "scheduled_when": None, + "start_date": "2020-01-02T00:00:00Z", + "state": "running", + "task_id": self.TASK_ID, + "task_display_name": self.TASK_ID, + "try_number": 0, + "unixname": getuser(), + "dag_run_id": self.RUN_ID, + "rendered_fields": {"op_args": [], "op_kwargs": {}, "templates_dict": None}, + "rendered_map_index": str(map_index), + "run_after": "2020-01-01T00:00:00Z", + "trigger": None, + "triggerer_job": None, + } + ], + "total_entries": 1, } _check_task_instance_note( - session, response_data["id"], {"content": new_note_value, "user_id": "test"} + session, + response_data["task_instances"][0]["id"], + {"content": new_note_value, "user_id": "test"}, ) def test_set_note_should_respond_200_when_note_is_empty(self, test_client, session): @@ -3574,10 +3601,9 @@ class TestPatchTaskInstance(TestTaskInstanceEndpoint): ) assert response.status_code == 200, response.text response_data = response.json() - assert response_data["note"] == new_note_value - _check_task_instance_note( - session, response_data["id"], {"content": new_note_value, "user_id": "test"} - ) + response_ti = response_data["task_instances"][0] + assert response_ti["note"] == new_note_value + _check_task_instance_note(session, response_ti["id"], {"content": new_note_value, "user_id": "test"}) @mock.patch("airflow.models.dag.DAG.set_task_instance_state") def test_should_raise_409_for_updating_same_task_instance_state(