This is an automated email from the ASF dual-hosted git repository. jedcunningham pushed a commit to branch v2-2-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit e9c49265ef9b3c3e4fd2fa7caf29799c81a8dda5 Author: Ephraim Anierobi <splendidzig...@gmail.com> AuthorDate: Wed Oct 20 19:56:44 2021 +0100 Update taskinstance REST API schema to include dag_run_id field (#19105) This PR adds dag_run_id field to taskinstance schema (cherry picked from commit cc3b062a2bdca16a7b239e73c4dc9e2a3a43c4f0) --- airflow/api_connexion/openapi/v1.yaml | 6 +++ .../api_connexion/schemas/task_instance_schema.py | 47 +++++++++++++--------- .../endpoints/test_task_instance_endpoint.py | 3 ++ .../schemas/test_task_instance_schema.py | 2 + 4 files changed, 38 insertions(+), 20 deletions(-) diff --git a/airflow/api_connexion/openapi/v1.yaml b/airflow/api_connexion/openapi/v1.yaml index a663e82..f34597e 100644 --- a/airflow/api_connexion/openapi/v1.yaml +++ b/airflow/api_connexion/openapi/v1.yaml @@ -2291,6 +2291,12 @@ components: type: string dag_id: type: string + dag_run_id: + type: string + description: | + The DagRun ID for this task instance + + *New in version 2.2.1* execution_date: type: string format: datetime diff --git a/airflow/api_connexion/schemas/task_instance_schema.py b/airflow/api_connexion/schemas/task_instance_schema.py index 89ae9a6..956246b 100644 --- a/airflow/api_connexion/schemas/task_instance_schema.py +++ b/airflow/api_connexion/schemas/task_instance_schema.py @@ -19,6 +19,7 @@ from typing import List, NamedTuple, Optional, Tuple from marshmallow import Schema, ValidationError, fields, validate, validates_schema from marshmallow.utils import get_value +from marshmallow_sqlalchemy import SQLAlchemySchema, auto_field from airflow.api_connexion.parameters import validate_istimezone from airflow.api_connexion.schemas.enum_schemas import TaskInstanceStateField @@ -27,28 +28,34 @@ from airflow.models import SlaMiss, TaskInstance from airflow.utils.state import State -class TaskInstanceSchema(Schema): +class TaskInstanceSchema(SQLAlchemySchema): """Task instance schema""" - task_id = fields.Str() - dag_id = fields.Str() - execution_date = fields.DateTime(validate=validate_istimezone) - start_date = fields.DateTime(validate=validate_istimezone) - end_date = fields.DateTime(validate=validate_istimezone) - duration = fields.Float() - state = TaskInstanceStateField() - _try_number = fields.Int(data_key="try_number") - max_tries = fields.Int() - hostname = fields.Str() - unixname = fields.Str() - pool = fields.Str() - pool_slots = fields.Int() - queue = fields.Str() - priority_weight = fields.Int() - operator = fields.Str() - queued_dttm = fields.DateTime(data_key="queued_when") - pid = fields.Int() - executor_config = fields.Str() + class Meta: + """Meta""" + + model = TaskInstance + + task_id = auto_field() + dag_id = auto_field() + run_id = auto_field(data_key="dag_run_id") + execution_date = auto_field() + start_date = auto_field() + end_date = auto_field() + duration = auto_field() + state = auto_field() + _try_number = auto_field(data_key="try_number") + max_tries = auto_field() + hostname = auto_field() + unixname = auto_field() + pool = auto_field() + pool_slots = auto_field() + queue = auto_field() + priority_weight = auto_field() + operator = auto_field() + queued_dttm = auto_field(data_key="queued_when") + pid = auto_field() + executor_config = auto_field() sla_miss = fields.Nested(SlaMissSchema, dump_default=None) def get_attribute(self, obj, attr, default): diff --git a/tests/api_connexion/endpoints/test_task_instance_endpoint.py b/tests/api_connexion/endpoints/test_task_instance_endpoint.py index 719ae35..18b0bf1 100644 --- a/tests/api_connexion/endpoints/test_task_instance_endpoint.py +++ b/tests/api_connexion/endpoints/test_task_instance_endpoint.py @@ -197,6 +197,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint): "task_id": "print_the_context", "try_number": 0, "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", } def test_should_respond_200_with_task_state_in_removed(self, session): @@ -227,6 +228,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint): "task_id": "print_the_context", "try_number": 0, "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", } def test_should_respond_200_task_instance_with_sla(self, session): @@ -275,6 +277,7 @@ class TestGetTaskInstance(TestTaskInstanceEndpoint): "task_id": "print_the_context", "try_number": 0, "unixname": getuser(), + "dag_run_id": "TEST_DAG_RUN_ID", } def test_should_raises_401_unauthenticated(self): diff --git a/tests/api_connexion/schemas/test_task_instance_schema.py b/tests/api_connexion/schemas/test_task_instance_schema.py index 883d936..e13defb 100644 --- a/tests/api_connexion/schemas/test_task_instance_schema.py +++ b/tests/api_connexion/schemas/test_task_instance_schema.py @@ -88,6 +88,7 @@ class TestTaskInstanceSchema: "task_id": "TEST_TASK_ID", "try_number": 0, "unixname": getuser(), + "dag_run_id": None, } assert serialized_ti == expected_json @@ -132,6 +133,7 @@ class TestTaskInstanceSchema: "task_id": "TEST_TASK_ID", "try_number": 0, "unixname": getuser(), + "dag_run_id": None, } assert serialized_ti == expected_json