This is an automated email from the ASF dual-hosted git repository. ephraimanierobi pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 815d1122cf AIP-64: Add task instance history list endpoint (#40988) 815d1122cf is described below commit 815d1122cfc04ca5d4c3bcb270bc9bfcde63cf9a Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Mon Jul 29 17:28:43 2024 +0100 AIP-64: Add task instance history list endpoint (#40988) This endpoint lists the task instance histories. It is like the task instance list endpoint but for the history. This will be used to replace the UI task instance history endpoint --- .../endpoints/task_instance_endpoint.py | 53 +++++++++- airflow/api_connexion/openapi/v1.yaml | 64 ++++++++++++ .../api_connexion/schemas/task_instance_schema.py | 15 +++ airflow/www/static/js/types/api-generated.ts | 108 +++++++++++++++++++++ .../endpoints/test_task_instance_endpoint.py | 81 ++++++++++++++++ 5 files changed, 319 insertions(+), 2 deletions(-) diff --git a/airflow/api_connexion/endpoints/task_instance_endpoint.py b/airflow/api_connexion/endpoints/task_instance_endpoint.py index 8094c1863e..a9213d2c24 100644 --- a/airflow/api_connexion/endpoints/task_instance_endpoint.py +++ b/airflow/api_connexion/endpoints/task_instance_endpoint.py @@ -30,6 +30,7 @@ from airflow.api_connexion.exceptions import BadRequest, NotFound, PermissionDen from airflow.api_connexion.parameters import format_datetime, format_parameters from airflow.api_connexion.schemas.task_instance_schema import ( TaskInstanceCollection, + TaskInstanceHistoryCollection, TaskInstanceReferenceCollection, clear_task_instance_form, set_single_task_instance_state_form, @@ -38,6 +39,7 @@ from airflow.api_connexion.schemas.task_instance_schema import ( task_dependencies_collection_schema, task_instance_batch_form, task_instance_collection_schema, + task_instance_history_collection_schema, task_instance_history_schema, task_instance_reference_collection_schema, task_instance_reference_schema, @@ -49,6 +51,7 @@ from airflow.exceptions import TaskNotFound from airflow.models import SlaMiss from airflow.models.dagrun import DagRun as DR from airflow.models.taskinstance import TaskInstance as TI, clear_task_instances +from airflow.models.taskinstancehistory import TaskInstanceHistory as TIH from airflow.utils.airflow_flask_app import get_airflow_app from airflow.utils.db import get_query_count from airflow.utils.session import NEW_SESSION, provide_session @@ -769,7 +772,6 @@ def get_task_instance_try_details( session: Session = NEW_SESSION, ) -> APIResponse: """Get details of a task instance try.""" - from airflow.models.taskinstancehistory import TaskInstanceHistory def _query(orm_object): query = select(orm_object).where( @@ -788,7 +790,7 @@ def get_task_instance_try_details( ) return result - result = _query(TI) or _query(TaskInstanceHistory) + result = _query(TI) or _query(TIH) if result is None: error_message = f"Task Instance not found for dag_id={dag_id}, run_id={dag_run_id}, task_id={task_id}, map_index={map_index}, try_number={task_try_number}." raise NotFound("Task instance not found", detail=error_message) @@ -814,3 +816,50 @@ def get_mapped_task_instance_try_details( map_index=map_index, session=session, ) + + +@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE) +@provide_session +def get_task_instance_tries( + *, + dag_id: str, + dag_run_id: str, + task_id: str, + map_index: int = -1, + session: Session = NEW_SESSION, +) -> APIResponse: + """Get list of task instances history.""" + + def _query(orm_object): + query = select(orm_object).where( + orm_object.dag_id == dag_id, + orm_object.run_id == dag_run_id, + orm_object.task_id == task_id, + orm_object.map_index == map_index, + ) + return query + + task_instances = session.scalars(_query(TIH)).all() + session.scalars(_query(TI)).all() + return task_instance_history_collection_schema.dump( + TaskInstanceHistoryCollection(task_instances=task_instances, total_entries=len(task_instances)) + ) + + +@security.requires_access_dag("GET", DagAccessEntity.TASK_INSTANCE) +@provide_session +def get_mapped_task_instance_tries( + *, + dag_id: str, + dag_run_id: str, + task_id: str, + map_index: int, + session: Session = NEW_SESSION, +) -> APIResponse: + """Get list of mapped task instances history.""" + return get_task_instance_tries( + dag_id=dag_id, + dag_run_id=dag_run_id, + task_id=task_id, + map_index=map_index, + session=session, + ) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index ed98e5c0d8..9a2d66eb5c 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -1751,6 +1751,69 @@ paths: "404": $ref: "#/components/responses/NotFound" + /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries: + get: + summary: List task instance tries + description: | + Get details of all task instance tries. + + *New in version 2.10.0* + x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint + operationId: get_task_instance_tries + tags: [TaskInstance] + parameters: + - $ref: "#/components/parameters/DAGID" + - $ref: "#/components/parameters/DAGRunID" + - $ref: "#/components/parameters/TaskID" + - $ref: "#/components/parameters/PageLimit" + - $ref: "#/components/parameters/PageOffset" + - $ref: "#/components/parameters/OrderBy" + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/TaskInstanceCollection" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + + /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries: + get: + summary: List mapped task instance tries + description: | + Get details of all task instance tries. + + *New in version 2.10.0* + x-openapi-router-controller: airflow.api_connexion.endpoints.task_instance_endpoint + operationId: get_mapped_task_instance_tries + tags: [TaskInstance] + parameters: + - $ref: "#/components/parameters/DAGID" + - $ref: "#/components/parameters/DAGRunID" + - $ref: "#/components/parameters/TaskID" + - $ref: "#/components/parameters/MapIndex" + - $ref: "#/components/parameters/PageLimit" + - $ref: "#/components/parameters/PageOffset" + - $ref: "#/components/parameters/OrderBy" + responses: + "200": + description: Success. + content: + application/json: + schema: + $ref: "#/components/schemas/TaskInstanceCollection" + "401": + $ref: "#/components/responses/Unauthenticated" + "403": + $ref: "#/components/responses/PermissionDenied" + "404": + $ref: "#/components/responses/NotFound" + /dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}: get: summary: get mapped taskinstance try @@ -1781,6 +1844,7 @@ paths: "404": $ref: "#/components/responses/NotFound" + /variables: get: summary: List variables diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 811f92e0a2..5d0eb72091 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -133,6 +133,20 @@ class TaskInstanceCollectionSchema(Schema): total_entries = fields.Int() +class TaskInstanceHistoryCollection(NamedTuple): + """List of task instances history with metadata.""" + + task_instances: list[TaskInstanceHistory | None] + total_entries: int + + +class TaskInstanceHistoryCollectionSchema(Schema): + """Task instance collection schema.""" + + task_instances = fields.List(fields.Nested(TaskInstanceHistorySchema)) + total_entries = fields.Int() + + class TaskInstanceBatchFormSchema(Schema): """Schema for the request form passed to Task Instance Batch endpoint.""" @@ -279,3 +293,4 @@ task_instance_reference_schema = TaskInstanceReferenceSchema() task_instance_reference_collection_schema = TaskInstanceReferenceCollectionSchema() set_task_instance_note_form_schema = SetTaskInstanceNoteFormSchema() task_instance_history_schema = TaskInstanceHistorySchema() +task_instance_history_collection_schema = TaskInstanceHistoryCollectionSchema() diff --git a/airflow/www/static/js/types/api-generated.ts b/airflow/www/static/js/types/api-generated.ts index 6ec7e3d1ff..6cda5f6265 100644 --- a/airflow/www/static/js/types/api-generated.ts +++ b/airflow/www/static/js/types/api-generated.ts @@ -555,6 +555,22 @@ export interface paths { */ get: operations["get_task_instance_try_details"]; }; + "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries": { + /** + * Get details of all task instance tries. + * + * *New in version 2.10.0* + */ + get: operations["get_task_instance_tries"]; + }; + "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries": { + /** + * Get details of all task instance tries. + * + * *New in version 2.10.0* + */ + get: operations["get_mapped_task_instance_tries"]; + }; "/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}": { /** * Get details of a mapped task instance try. @@ -4320,6 +4336,90 @@ export interface operations { 404: components["responses"]["NotFound"]; }; }; + /** + * Get details of all task instance tries. + * + * *New in version 2.10.0* + */ + get_task_instance_tries: { + parameters: { + path: { + /** The DAG ID. */ + dag_id: components["parameters"]["DAGID"]; + /** The DAG run ID. */ + dag_run_id: components["parameters"]["DAGRunID"]; + /** The task ID. */ + task_id: components["parameters"]["TaskID"]; + }; + query: { + /** The numbers of items to return. */ + limit?: components["parameters"]["PageLimit"]; + /** The number of items to skip before starting to collect the result set. */ + offset?: components["parameters"]["PageOffset"]; + /** + * The name of the field to order the results by. + * Prefix a field name with `-` to reverse the sort order. + * + * *New in version 2.1.0* + */ + order_by?: components["parameters"]["OrderBy"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["TaskInstanceCollection"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + }; + }; + /** + * Get details of all task instance tries. + * + * *New in version 2.10.0* + */ + get_mapped_task_instance_tries: { + parameters: { + path: { + /** The DAG ID. */ + dag_id: components["parameters"]["DAGID"]; + /** The DAG run ID. */ + dag_run_id: components["parameters"]["DAGRunID"]; + /** The task ID. */ + task_id: components["parameters"]["TaskID"]; + /** The map index. */ + map_index: components["parameters"]["MapIndex"]; + }; + query: { + /** The numbers of items to return. */ + limit?: components["parameters"]["PageLimit"]; + /** The number of items to skip before starting to collect the result set. */ + offset?: components["parameters"]["PageOffset"]; + /** + * The name of the field to order the results by. + * Prefix a field name with `-` to reverse the sort order. + * + * *New in version 2.1.0* + */ + order_by?: components["parameters"]["OrderBy"]; + }; + }; + responses: { + /** Success. */ + 200: { + content: { + "application/json": components["schemas"]["TaskInstanceCollection"]; + }; + }; + 401: components["responses"]["Unauthenticated"]; + 403: components["responses"]["PermissionDenied"]; + 404: components["responses"]["NotFound"]; + }; + }; /** * Get details of a mapped task instance try. * @@ -5740,6 +5840,14 @@ export type GetTaskInstancesBatchVariables = CamelCasedPropertiesDeep< export type GetTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep< operations["get_task_instance_try_details"]["parameters"]["path"] >; +export type GetTaskInstanceTriesVariables = CamelCasedPropertiesDeep< + operations["get_task_instance_tries"]["parameters"]["path"] & + operations["get_task_instance_tries"]["parameters"]["query"] +>; +export type GetMappedTaskInstanceTriesVariables = CamelCasedPropertiesDeep< + operations["get_mapped_task_instance_tries"]["parameters"]["path"] & + operations["get_mapped_task_instance_tries"]["parameters"]["query"] +>; export type GetMappedTaskInstanceTryDetailsVariables = CamelCasedPropertiesDeep< operations["get_mapped_task_instance_try_details"]["parameters"]["path"] >; diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index e77f75c303..b3e95c33b3 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -2942,3 +2942,84 @@ class TestGetTaskInstanceTry(TestTaskInstanceEndpoint): ) assert response.status_code == 404 assert response.json["title"] == "Task instance not found" + + +class TestGetTaskInstanceTries(TestTaskInstanceEndpoint): + def setup_method(self): + clear_db_runs() + + def teardown_method(self): + clear_db_runs() + + def test_should_respond_200(self, session): + self.create_task_instances( + session=session, task_instances=[{"state": State.SUCCESS}], with_ti_history=True + ) + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert response.json["total_entries"] == 2 # The task instance and its history + assert len(response.json["task_instances"]) == 2 + + def test_mapped_task_should_respond_200(self, session): + tis = self.create_task_instances(session, task_instances=[{"state": State.FAILED}]) + old_ti = tis[0] + for idx in (1, 2): + ti = TaskInstance(task=old_ti.task, run_id=old_ti.run_id, map_index=idx) + ti.try_number = 1 + session.add(ti) + session.commit() + tis = session.query(TaskInstance).all() + + # Record the task instance history + from airflow.models.taskinstance import clear_task_instances + + clear_task_instances(tis, session) + # Simulate the try_number increasing to new values in TI + for ti in tis: + if ti.map_index > 0: + ti.try_number += 1 + ti.queue = "default_queue" + session.merge(ti) + session.commit() + + # in each loop, we should get the right mapped TI back + for map_index in (1, 2): + # Get the info from TIHistory: try_number 1, try_number 2 is TI table(latest) + response = self.client.get( + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances" + f"/print_the_context/{map_index}/tries", + environ_overrides={"REMOTE_USER": "test"}, + ) + assert response.status_code == 200 + assert ( + response.json["total_entries"] == 2 + ) # the mapped task was cleared. So both the task instance and its history + assert len(response.json["task_instances"]) == 2 + + @pytest.mark.parametrize( + "url", + [ + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries", + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/0/tries", + ], + ) + def test_should_raises_401_unauthenticated(self, url): + response = self.client.get(url) + assert_401(response) + + @pytest.mark.parametrize( + "url", + [ + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/tries", + "/api/v1/dags/example_python_operator/dagRuns/TEST_DAG_RUN_ID/taskInstances/print_the_context/0/tries", + ], + ) + def test_should_raise_403_forbidden(self, url): + response = self.client.get( + url, + environ_overrides={"REMOTE_USER": "test_no_permissions"}, + ) + assert response.status_code == 403