This is an automated email from the ASF dual-hosted git repository. kaxilnaik pushed a commit to branch v3-1-test in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 6a440b56c1f9b384696d4e18e47d2c18f339241d Author: Wei Lee <[email protected]> AuthorDate: Sat Sep 13 08:25:28 2025 +0800 refactor(hitl): rename response_at to responded_at (#55535) (cherry picked from commit 4709c18da6819906b3c1e8a371b8a665641c0a3d) --- airflow-core/docs/img/airflow_erd.sha256 | 2 +- airflow-core/docs/img/airflow_erd.svg | 12 +++++----- .../src/airflow/api_fastapi/common/parameters.py | 2 +- .../api_fastapi/core_api/datamodels/hitl.py | 4 ++-- .../api_fastapi/core_api/openapi/_private_ui.yaml | 4 ++-- .../core_api/openapi/v2-rest-api-generated.yaml | 10 ++++----- .../api_fastapi/core_api/routes/public/hitl.py | 4 ++-- .../airflow/api_fastapi/core_api/routes/ui/dags.py | 2 +- .../api_fastapi/execution_api/datamodels/hitl.py | 4 ++-- .../api_fastapi/execution_api/routes/hitl.py | 4 ++-- .../0076_3_1_0_add_human_in_the_loop_response.py | 2 +- airflow-core/src/airflow/models/hitl.py | 6 ++--- .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 10 ++++----- .../airflow/ui/openapi-gen/requests/types.gen.ts | 4 ++-- .../pages/HITLTaskInstances/HITLResponseForm.tsx | 2 +- .../pages/HITLTaskInstances/HITLTaskInstances.tsx | 4 ++-- .../core_api/routes/public/test_hitl.py | 14 ++++++------ .../api_fastapi/core_api/routes/ui/test_dags.py | 2 +- .../execution_api/versions/head/test_hitl.py | 6 ++--- .../src/airflowctl/api/datamodels/generated.py | 4 ++-- .../airflow/providers/standard/operators/hitl.py | 1 + .../airflow/providers/standard/triggers/hitl.py | 14 +++++++++--- .../tests/unit/standard/operators/test_hitl.py | 26 +++++++++++++++++++--- .../tests/unit/standard/triggers/test_hitl.py | 11 +++++---- .../src/airflow/sdk/api/datamodels/_generated.py | 2 +- task-sdk/tests/task_sdk/api/test_client.py | 8 +++---- .../tests/task_sdk/execution_time/test_hitl.py | 8 +++---- 27 files changed, 102 insertions(+), 70 deletions(-) diff --git a/airflow-core/docs/img/airflow_erd.sha256 b/airflow-core/docs/img/airflow_erd.sha256 index b6b5a4a9270..ba060d4f5ee 100644 --- a/airflow-core/docs/img/airflow_erd.sha256 +++ b/airflow-core/docs/img/airflow_erd.sha256 @@ -1 +1 @@ -e491b0c58188f06ab4696cc09c765413065069f90d78e27eb53fdac5e2e92c82 \ No newline at end of file +35e9e07930e138664fb6ff23bc299567a88946734630d84f3d7d95deacf2f4b8 \ No newline at end of file diff --git a/airflow-core/docs/img/airflow_erd.svg b/airflow-core/docs/img/airflow_erd.svg index f08f88ff3f6..6fbf9c224a4 100644 --- a/airflow-core/docs/img/airflow_erd.svg +++ b/airflow-core/docs/img/airflow_erd.svg @@ -1480,13 +1480,13 @@ <text text-anchor="start" x="2303" y="-2088.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text> <text text-anchor="start" x="2354" y="-2088.8" font-family="Helvetica,sans-Serif" font-size="14.00"> NOT NULL</text> <polygon fill="none" stroke="black" points="2197,-2054 2197,-2079 2435,-2079 2435,-2054 2197,-2054"/> -<text text-anchor="start" x="2202" y="-2063.8" font-family="Helvetica,sans-Serif" font-size="14.00">responded_by</text> -<text text-anchor="start" x="2300" y="-2063.8" font-family="Helvetica,sans-Serif" font-size="14.00"> </text> -<text text-anchor="start" x="2305" y="-2063.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text> +<text text-anchor="start" x="2202" y="-2063.8" font-family="Helvetica,sans-Serif" font-size="14.00">responded_at</text> +<text text-anchor="start" x="2296" y="-2063.8" font-family="Helvetica,sans-Serif" font-size="14.00"> </text> +<text text-anchor="start" x="2301" y="-2063.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text> <polygon fill="none" stroke="black" points="2197,-2029 2197,-2054 2435,-2054 2435,-2029 2197,-2029"/> -<text text-anchor="start" x="2202" y="-2038.8" font-family="Helvetica,sans-Serif" font-size="14.00">response_at</text> -<text text-anchor="start" x="2286" y="-2038.8" font-family="Helvetica,sans-Serif" font-size="14.00"> </text> -<text text-anchor="start" x="2291" y="-2038.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [TIMESTAMP]</text> +<text text-anchor="start" x="2202" y="-2038.8" font-family="Helvetica,sans-Serif" font-size="14.00">responded_by</text> +<text text-anchor="start" x="2300" y="-2038.8" font-family="Helvetica,sans-Serif" font-size="14.00"> </text> +<text text-anchor="start" x="2305" y="-2038.8" font-family="Helvetica,sans-Serif" font-size="14.00"> [JSON]</text> <polygon fill="none" stroke="black" points="2197,-2004 2197,-2029 2435,-2029 2435,-2004 2197,-2004"/> <text text-anchor="start" x="2202" y="-2013.8" font-family="Helvetica,sans-Serif" font-size="14.00">subject</text> <text text-anchor="start" x="2253" y="-2013.8" font-family="Helvetica,sans-Serif" font-size="14.00"> </text> diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index d2dce9743f7..c6ff8aa6f46 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -719,7 +719,7 @@ class _PendingActionsFilter(BaseParam[bool]): sql_select(func.count(HITLDetail.ti_id)) .join(TaskInstance, HITLDetail.ti_id == TaskInstance.id) .where( - HITLDetail.response_at.is_(None), + HITLDetail.responded_at.is_(None), TaskInstance.state == TaskInstanceState.DEFERRED, ) .where(TaskInstance.dag_id == DagModel.dag_id) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py index 24a18821f13..f24688c13e5 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/hitl.py @@ -38,7 +38,7 @@ class HITLDetailResponse(BaseModel): """Response of updating a Human-in-the-loop detail.""" responded_by: HITLUser - response_at: datetime + responded_at: datetime chosen_options: list[str] = Field(min_length=1) params_input: Mapping = Field(default_factory=dict) @@ -66,7 +66,7 @@ class HITLDetail(BaseModel): # Response Content Detail responded_by_user: HITLUser | None = None - response_at: datetime | None = None + responded_at: datetime | None = None chosen_options: list[str] | None = None params_input: dict[str, Any] = Field(default_factory=dict) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 383b9fca62f..269f60cd4c3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -2003,12 +2003,12 @@ components: anyOf: - $ref: '#/components/schemas/HITLUser' - type: 'null' - response_at: + responded_at: anyOf: - type: string format: date-time - type: 'null' - title: Response At + title: Responded At chosen_options: anyOf: - items: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 9150f12c012..7c8bba4e9aa 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -10865,12 +10865,12 @@ components: anyOf: - $ref: '#/components/schemas/HITLUser' - type: 'null' - response_at: + responded_at: anyOf: - type: string format: date-time - type: 'null' - title: Response At + title: Responded At chosen_options: anyOf: - items: @@ -10913,10 +10913,10 @@ components: properties: responded_by: $ref: '#/components/schemas/HITLUser' - response_at: + responded_at: type: string format: date-time - title: Response At + title: Responded At chosen_options: items: type: string @@ -10930,7 +10930,7 @@ components: type: object required: - responded_by - - response_at + - responded_at - chosen_options title: HITLDetailResponse description: Response of updating a Human-in-the-loop detail. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py index 5f39e18dd13..c444a759ef3 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py @@ -157,7 +157,7 @@ def update_hitl_detail( ) hitl_detail_model.responded_by = hitl_user - hitl_detail_model.response_at = timezone.utcnow() + hitl_detail_model.responded_at = timezone.utcnow() hitl_detail_model.chosen_options = update_hitl_detail_payload.chosen_options hitl_detail_model.params_input = update_hitl_detail_payload.params_input session.add(hitl_detail_model) @@ -204,7 +204,7 @@ def get_hitl_details( allowed_attrs=[ "ti_id", "subject", - "response_at", + "responded_at", ], model=HITLDetailModel, to_replace={ diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py index 9cd5698800a..af4ed341f7f 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -203,7 +203,7 @@ def get_dags( ) .join(TaskInstance, HITLDetail.ti_id == TaskInstance.id) .where( - HITLDetail.response_at.is_(None), + HITLDetail.responded_at.is_(None), TaskInstance.state == TaskInstanceState.DEFERRED, ) .where(TaskInstance.dag_id.in_([dag.dag_id for dag in dags])) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/hitl.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/hitl.py index a5f16eb8818..4cccbb69e71 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/hitl.py @@ -59,7 +59,7 @@ class HITLDetailResponse(BaseModel): response_received: bool responded_by_user: HITLUser | None = None - response_at: datetime | None + responded_at: datetime | None # It's empty if the user has not yet responded. chosen_options: list[str] | None params_input: dict[str, Any] = Field(default_factory=dict) @@ -77,7 +77,7 @@ class HITLDetailResponse(BaseModel): return HITLDetailResponse( response_received=hitl_detail.response_received, - response_at=hitl_detail.response_at, + responded_at=hitl_detail.responded_at, responded_by_user=hitl_user, chosen_options=hitl_detail.chosen_options, params_input=hitl_detail.params_input or {}, diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py index 85d8e008990..50b9377d2f8 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/hitl.py @@ -78,7 +78,7 @@ def upsert_hitl_detail( # Cleanup the response part of HITLDetail as we only store one response for one task instance. # It normally happens after retry, we keep only the latest response. hitl_detail_model.responded_by = None - hitl_detail_model.response_at = None + hitl_detail_model.responded_at = None hitl_detail_model.chosen_options = None hitl_detail_model.params_input = {} session.add(hitl_detail_model) @@ -117,7 +117,7 @@ def update_hitl_detail( ) hitl_detail_model.responded_by = None - hitl_detail_model.response_at = datetime.now(timezone.utc) + hitl_detail_model.responded_at = datetime.now(timezone.utc) hitl_detail_model.chosen_options = payload.chosen_options hitl_detail_model.params_input = payload.params_input session.add(hitl_detail_model) diff --git a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py index 60f870ff44c..cd875711ca4 100644 --- a/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py +++ b/airflow-core/src/airflow/migrations/versions/0076_3_1_0_add_human_in_the_loop_response.py @@ -60,7 +60,7 @@ def upgrade(): Column("multiple", Boolean, unique=False, default=False), Column("params", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}), Column("assignees", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), - Column("response_at", UtcDateTime, nullable=True), + Column("responded_at", UtcDateTime, nullable=True), Column("responded_by", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), Column("chosen_options", sqlalchemy_jsonfield.JSONField(json=json), nullable=True), Column("params_input", sqlalchemy_jsonfield.JSONField(json=json), nullable=False, default={}), diff --git a/airflow-core/src/airflow/models/hitl.py b/airflow-core/src/airflow/models/hitl.py index 7f0069688c1..f13c4a6117f 100644 --- a/airflow-core/src/airflow/models/hitl.py +++ b/airflow-core/src/airflow/models/hitl.py @@ -99,7 +99,7 @@ class HITLDetail(Base): assignees = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) # Response Content Detail - response_at = Column(UtcDateTime, nullable=True) + responded_at = Column(UtcDateTime, nullable=True) responded_by = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True) chosen_options = Column( sqlalchemy_jsonfield.JSONField(json=json), @@ -125,11 +125,11 @@ class HITLDetail(Base): @hybrid_property def response_received(self) -> bool: - return self.response_at is not None + return self.responded_at is not None @response_received.expression # type: ignore[no-redef] def response_received(cls): - return cls.response_at.is_not(None) + return cls.responded_at.is_not(None) @hybrid_property def responded_by_user_id(self) -> str | None: diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 332d112cb7c..1cf2a46b461 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3588,7 +3588,7 @@ export const $HITLDetail = { } ] }, - response_at: { + responded_at: { anyOf: [ { type: 'string', @@ -3598,7 +3598,7 @@ export const $HITLDetail = { type: 'null' } ], - title: 'Response At' + title: 'Responded At' }, chosen_options: { anyOf: [ @@ -3656,10 +3656,10 @@ export const $HITLDetailResponse = { responded_by: { '$ref': '#/components/schemas/HITLUser' }, - response_at: { + responded_at: { type: 'string', format: 'date-time', - title: 'Response At' + title: 'Responded At' }, chosen_options: { items: { @@ -3676,7 +3676,7 @@ export const $HITLDetailResponse = { } }, type: 'object', - required: ['responded_by', 'response_at', 'chosen_options'], + required: ['responded_by', 'responded_at', 'chosen_options'], title: 'HITLDetailResponse', description: 'Response of updating a Human-in-the-loop detail.' } as const; diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index e88e743d5b5..21a81305bd8 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -942,7 +942,7 @@ export type HITLDetail = { }; assigned_users?: Array<HITLUser>; responded_by_user?: HITLUser | null; - response_at?: string | null; + responded_at?: string | null; chosen_options?: Array<(string)> | null; params_input?: { [key: string]: unknown; @@ -963,7 +963,7 @@ export type HITLDetailCollection = { */ export type HITLDetailResponse = { responded_by: HITLUser; - response_at: string; + responded_at: string; chosen_options: Array<(string)>; params_input?: { [key: string]: unknown; diff --git a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx index e703683ce9b..23d10c44713 100644 --- a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx +++ b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLResponseForm.tsx @@ -98,7 +98,7 @@ export const HITLResponseForm = ({ hitlDetail }: HITLResponseFormProps) => { {hitlDetail.response_received ? ( <Text color="fg.muted" fontSize="sm"> {translate("response.received")} - <Time datetime={hitlDetail.response_at} format={DEFAULT_DATETIME_FORMAT} /> + <Time datetime={hitlDetail.responded_at} format={DEFAULT_DATETIME_FORMAT} /> </Text> ) : undefined} <Accordion.Root diff --git a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx index 663b4615f29..29e762ee636 100644 --- a/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx +++ b/airflow-core/src/airflow/ui/src/pages/HITLTaskInstances/HITLTaskInstances.tsx @@ -107,8 +107,8 @@ const taskInstanceColumns = ({ header: translate("common:mapIndex"), }, { - accessorKey: "response_at", - cell: ({ row: { original } }) => <Time datetime={original.response_at} />, + accessorKey: "responded_at", + cell: ({ row: { original } }) => <Time datetime={original.responded_at} />, header: translate("response.received"), }, ]; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py index e20f9fabba4..f29fe60cfef 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_hitl.py @@ -171,7 +171,7 @@ def sample_hitl_details(sample_tis: list[TaskInstance], session: Session) -> lis defaults=["1"], multiple=False, params={"input": 1}, - response_at=utcnow(), + responded_at=utcnow(), chosen_options=[str(i)], params_input={"input": i}, responded_by={"id": "test", "name": "test"}, @@ -210,7 +210,7 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]: "params": {"input_1": 1}, "assigned_users": [], "params_input": {}, - "response_at": None, + "responded_at": None, "chosen_options": None, "response_received": False, "subject": "This is subject", @@ -321,7 +321,7 @@ class TestUpdateHITLDetailEndpoint: "params_input": {"input_1": 2}, "chosen_options": ["Approve"], "responded_by": {"id": "test", "name": "test"}, - "response_at": "2025-07-03T00:00:00Z", + "responded_at": "2025-07-03T00:00:00Z", } audit_log = session.scalar(select(Log)) @@ -350,7 +350,7 @@ class TestUpdateHITLDetailEndpoint: "params_input": {"input_1": 2}, "chosen_options": ["Approve"], "responded_by": {"id": "test", "name": "test"}, - "response_at": "2025-07-03T00:00:00Z", + "responded_at": "2025-07-03T00:00:00Z", } audit_log = session.scalar(select(Log)) @@ -451,7 +451,7 @@ class TestUpdateHITLDetailEndpoint: "params_input": {"input_1": 2}, "chosen_options": ["Approve"], "responded_by": {"id": "test", "name": "test"}, - "response_at": "2025-07-03T00:00:00Z", + "responded_at": "2025-07-03T00:00:00Z", } assert response.status_code == 200 assert response.json() == expected_response @@ -620,7 +620,7 @@ class TestGetHITLDetailsEndpoint: ("task_instance_operator", lambda x: x["task_instance"]["operator_name"]), # htil key ("subject", itemgetter("subject")), - ("response_at", itemgetter("response_at")), + ("responded_at", itemgetter("responded_at")), ], ids=[ "ti_id", @@ -630,7 +630,7 @@ class TestGetHITLDetailsEndpoint: "rendered_map_index", "task_instance_operator", "subject", - "response_at", + "responded_at", ], ) def test_should_respond_200_with_existing_response_and_order_by( diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py index 0272b584ab6..d58f29839b6 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py @@ -159,7 +159,7 @@ class TestGetDagRuns(TestPublicDagEndpoint): options=["Approve", "Reject"], subject=f"This is subject {i}", defaults=["Approve"], - response_at=utcnow(), + responded_at=utcnow(), chosen_options=["Approve"], responded_by={"id": "test", "name": "test"}, ) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py index 33a1b3e79bc..b587b35ee28 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_hitl.py @@ -48,7 +48,7 @@ default_hitl_detail_request_kwargs: dict[str, Any] = { "assignees": None, } expected_empty_hitl_detail_response_part: dict[str, Any] = { - "response_at": None, + "responded_at": None, "chosen_options": None, "responded_by_user": None, "params_input": {}, @@ -91,7 +91,7 @@ def expected_sample_hitl_detail_dict(sample_ti: TaskInstance) -> dict[str, Any]: **default_hitl_detail_request_kwargs, **{ "params_input": {"input_1": 2}, - "response_at": convert_to_utc(datetime(2025, 7, 3, 0, 0, 0)), + "responded_at": convert_to_utc(datetime(2025, 7, 3, 0, 0, 0)), "chosen_options": ["Reject"], "responded_by": None, }, @@ -171,7 +171,7 @@ def test_update_hitl_detail(client: Client, sample_ti: TaskInstance) -> None: assert response.status_code == 200 assert response.json() == { "params_input": {"input_1": 2}, - "response_at": "2025-07-03T00:00:00Z", + "responded_at": "2025-07-03T00:00:00Z", "chosen_options": ["Reject"], "response_received": True, "responded_by_user": None, diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index ccad0f23ed8..31da4e13044 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -1437,7 +1437,7 @@ class HITLDetailResponse(BaseModel): """ responded_by: HITLUser - response_at: Annotated[datetime, Field(title="Response At")] + responded_at: Annotated[datetime, Field(title="Responded At")] chosen_options: Annotated[list[str], Field(min_length=1, title="Chosen Options")] params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None @@ -1838,7 +1838,7 @@ class HITLDetail(BaseModel): params: Annotated[dict[str, Any] | None, Field(title="Params")] = None assigned_users: Annotated[list[HITLUser] | None, Field(title="Assigned Users")] = None responded_by_user: HITLUser | None = None - response_at: Annotated[datetime | None, Field(title="Response At")] = None + responded_at: Annotated[datetime | None, Field(title="Responded At")] = None chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None response_received: Annotated[bool | None, Field(title="Response Received")] = False diff --git a/providers/standard/src/airflow/providers/standard/operators/hitl.py b/providers/standard/src/airflow/providers/standard/operators/hitl.py index f8e1c631ea6..1c5559adf00 100644 --- a/providers/standard/src/airflow/providers/standard/operators/hitl.py +++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py @@ -179,6 +179,7 @@ class HITLOperator(BaseOperator): return HITLTriggerEventSuccessPayload( chosen_options=chosen_options, params_input=params_input, + responded_at=event["responded_at"], responded_by_user=event["responded_by_user"], ) diff --git a/providers/standard/src/airflow/providers/standard/triggers/hitl.py b/providers/standard/src/airflow/providers/standard/triggers/hitl.py index bdaa84b01f6..b0b8ec71ef9 100644 --- a/providers/standard/src/airflow/providers/standard/triggers/hitl.py +++ b/providers/standard/src/airflow/providers/standard/triggers/hitl.py @@ -45,6 +45,7 @@ class HITLTriggerEventSuccessPayload(TypedDict, total=False): chosen_options: list[str] params_input: dict[str, Any] responded_by_user: HITLUser | None + responded_at: datetime timedout: bool @@ -106,18 +107,20 @@ class HITLTrigger(BaseTrigger): if resp.response_received and resp.chosen_options: if TYPE_CHECKING: assert resp.responded_by_user is not None + assert resp.responded_at is not None self.log.info( "[HITL] responded_by=%s (id=%s) options=%s at %s (timeout fallback skipped)", resp.responded_by_user.name, resp.responded_by_user.id, resp.chosen_options, - resp.response_at, + resp.responded_at, ) yield TriggerEvent( HITLTriggerEventSuccessPayload( chosen_options=resp.chosen_options, params_input=resp.params_input or {}, + responded_at=resp.responded_at, responded_by_user=HITLUser( id=resp.responded_by_user.id, name=resp.responded_by_user.name, @@ -136,11 +139,13 @@ class HITLTrigger(BaseTrigger): ) return - await sync_to_async(update_hitl_detail_response)( + resp = await sync_to_async(update_hitl_detail_response)( ti_id=self.ti_id, chosen_options=self.defaults, params_input=self.params, ) + if TYPE_CHECKING: + assert resp.responded_at is not None self.log.info( "[HITL] timeout reached before receiving response, fallback to default %s", self.defaults ) @@ -149,6 +154,7 @@ class HITLTrigger(BaseTrigger): chosen_options=self.defaults, params_input=self.params, responded_by_user=None, + responded_at=resp.responded_at, timedout=True, ) ) @@ -158,17 +164,19 @@ class HITLTrigger(BaseTrigger): if resp.response_received and resp.chosen_options: if TYPE_CHECKING: assert resp.responded_by_user is not None + assert resp.responded_at is not None self.log.info( "[HITL] responded_by=%s (id=%s) options=%s at %s", resp.responded_by_user.name, resp.responded_by_user.id, resp.chosen_options, - resp.response_at, + resp.responded_at, ) yield TriggerEvent( HITLTriggerEventSuccessPayload( chosen_options=resp.chosen_options, params_input=resp.params_input or {}, + responded_at=resp.responded_at, responded_by_user=HITLUser( id=resp.responded_by_user.id, name=resp.responded_by_user.name, diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py b/providers/standard/tests/unit/standard/operators/test_hitl.py index 98d0bf66864..7735a8c04c9 100644 --- a/providers/standard/tests/unit/standard/operators/test_hitl.py +++ b/providers/standard/tests/unit/standard/operators/test_hitl.py @@ -183,7 +183,7 @@ class TestHITLOperator: assert hitl_detail_model.multiple is False assert hitl_detail_model.params == {"input_1": 1} assert hitl_detail_model.assignees == [{"id": "test", "name": "test"}] - assert hitl_detail_model.response_at is None + assert hitl_detail_model.responded_at is None assert hitl_detail_model.responded_by is None assert hitl_detail_model.chosen_options is None assert hitl_detail_model.params_input == {} @@ -230,17 +230,24 @@ class TestHITLOperator: params={"input": 1}, ) + responded_at_dt = timezone.utcnow() + ret = hitl_op.execute_complete( context={}, event={ "chosen_options": ["1"], "params_input": {"input": 2}, + "responded_at": responded_at_dt, "responded_by_user": {"id": "test", "name": "test"}, }, ) - assert ret["chosen_options"] == ["1"] - assert ret["params_input"] == {"input": 2} + assert ret == { + "chosen_options": ["1"], + "params_input": {"input": 2}, + "responded_at": responded_at_dt, + "responded_by_user": {"id": "test", "name": "test"}, + } def test_validate_chosen_options_with_invalid_content(self) -> None: hitl_op = HITLOperator( @@ -401,11 +408,14 @@ class TestApprovalOperator: subject="This is subject", ) + responded_at_dt = timezone.utcnow() + ret = hitl_op.execute_complete( context={}, event={ "chosen_options": ["Approve"], "params_input": {}, + "responded_at": responded_at_dt, "responded_by_user": {"id": "test", "name": "test"}, }, ) @@ -413,6 +423,7 @@ class TestApprovalOperator: assert ret == { "chosen_options": ["Approve"], "params_input": {}, + "responded_at": responded_at_dt, "responded_by_user": {"id": "test", "name": "test"}, } @@ -433,6 +444,7 @@ class TestApprovalOperator: event={ "chosen_options": ["Reject"], "params_input": {}, + "responded_at": timezone.utcnow(), "responded_by_user": {"id": "test", "name": "test"}, }, ) @@ -495,6 +507,7 @@ class TestHITLBranchOperator: event={ "chosen_options": ["branch_1"], "params_input": {}, + "responded_at": timezone.utcnow(), "responded_by_user": {"id": "test", "name": "test"}, }, ) @@ -511,6 +524,8 @@ class TestHITLBranchOperator: branch_op >> [EmptyOperator(task_id=f"branch_{i}") for i in range(1, 6)] + responded_at_dt = timezone.utcnow() + dr = dag_maker.create_dagrun() ti = dr.get_task_instance("make_choice") with pytest.raises(DownstreamTasksSkipped) as exc_info: @@ -519,6 +534,7 @@ class TestHITLBranchOperator: event={ "chosen_options": [f"branch_{i}" for i in range(1, 4)], "params_input": {}, + "responded_at": responded_at_dt, "responded_by_user": {"id": "test", "name": "test"}, }, ) @@ -544,6 +560,7 @@ class TestHITLBranchOperator: event={ "chosen_options": ["Approve"], "params_input": {}, + "responded_at": timezone.utcnow(), "responded_by_user": {"id": "test", "name": "test"}, }, ) @@ -575,6 +592,7 @@ class TestHITLBranchOperator: event={ "chosen_options": ["Approve", "KeepAsIs"], "params_input": {}, + "responded_at": timezone.utcnow(), "responded_by_user": {"id": "test", "name": "test"}, }, ) @@ -600,6 +618,7 @@ class TestHITLBranchOperator: event={ "chosen_options": ["branch_2"], "params_input": {}, + "responded_at": timezone.utcnow(), "responded_by_user": {"id": "test", "name": "test"}, }, ) @@ -625,6 +644,7 @@ class TestHITLBranchOperator: event={ "chosen_options": ["Approve"], "params_input": {}, + "responded_at": timezone.utcnow(), "responded_by_user": {"id": "test", "name": "test"}, }, ) diff --git a/providers/standard/tests/unit/standard/triggers/test_hitl.py b/providers/standard/tests/unit/standard/triggers/test_hitl.py index 118bcb77dc1..6aba3b1c60d 100644 --- a/providers/standard/tests/unit/standard/triggers/test_hitl.py +++ b/providers/standard/tests/unit/standard/triggers/test_hitl.py @@ -81,7 +81,7 @@ class TestHITLTrigger: response_received=False, responded_user_id=None, responded_user_name=None, - response_at=None, + responded_at=None, chosen_options=None, params_input={}, ) @@ -111,7 +111,7 @@ class TestHITLTrigger: mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=False, responded_by_user=None, - response_at=None, + responded_at=None, chosen_options=None, params_input={}, ) @@ -126,6 +126,7 @@ class TestHITLTrigger: chosen_options=["1"], params_input={"input": 1}, responded_by_user=None, + responded_at=mock.ANY, timedout=True, ) ) @@ -154,7 +155,7 @@ class TestHITLTrigger: mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=True, responded_by_user=HITLUser(id="1", name="test"), - response_at=action_datetime, + responded_at=action_datetime, chosen_options=["2"], params_input={}, ) @@ -168,6 +169,7 @@ class TestHITLTrigger: HITLTriggerEventSuccessPayload( chosen_options=["2"], params_input={}, + responded_at=mock.ANY, responded_by_user={"id": "1", "name": "test"}, timedout=False, ) @@ -197,7 +199,7 @@ class TestHITLTrigger: mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=True, responded_by_user=HITLUser(id="test", name="test"), - response_at=utcnow(), + responded_at=utcnow(), chosen_options=["3"], params_input={"input": 50}, ) @@ -210,6 +212,7 @@ class TestHITLTrigger: HITLTriggerEventSuccessPayload( chosen_options=["3"], params_input={"input": 50}, + responded_at=mock.ANY, responded_by_user={"id": "test", "name": "test"}, timedout=False, ) diff --git a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py index d2151d59d02..8e4c4207b27 100644 --- a/task-sdk/src/airflow/sdk/api/datamodels/_generated.py +++ b/task-sdk/src/airflow/sdk/api/datamodels/_generated.py @@ -564,7 +564,7 @@ class HITLDetailResponse(BaseModel): response_received: Annotated[bool, Field(title="Response Received")] responded_by_user: HITLUser | None = None - response_at: Annotated[AwareDatetime | None, Field(title="Response At")] = None + responded_at: Annotated[AwareDatetime | None, Field(title="Responded At")] = None chosen_options: Annotated[list[str] | None, Field(title="Chosen Options")] = None params_input: Annotated[dict[str, Any] | None, Field(title="Params Input")] = None diff --git a/task-sdk/tests/task_sdk/api/test_client.py b/task-sdk/tests/task_sdk/api/test_client.py index 9464296934d..cbd7c80bb01 100644 --- a/task-sdk/tests/task_sdk/api/test_client.py +++ b/task-sdk/tests/task_sdk/api/test_client.py @@ -1305,7 +1305,7 @@ class TestHITLOperations: "params_input": {}, "responded_by_user": {"id": "admin", "name": "admin"}, "response_received": True, - "response_at": "2025-07-03T00:00:00Z", + "responded_at": "2025-07-03T00:00:00Z", }, ) return httpx.Response(status_code=400, json={"detail": "Bad Request"}) @@ -1321,7 +1321,7 @@ class TestHITLOperations: assert result.chosen_options == ["Approval"] assert result.params_input == {} assert result.responded_by_user == HITLUser(id="admin", name="admin") - assert result.response_at == timezone.datetime(2025, 7, 3, 0, 0, 0) + assert result.responded_at == timezone.datetime(2025, 7, 3, 0, 0, 0) def test_get_detail_response(self, time_machine: TimeMachineFixture) -> None: time_machine.move_to(datetime(2025, 7, 3, 0, 0, 0)) @@ -1336,7 +1336,7 @@ class TestHITLOperations: "params_input": {}, "responded_by_user": {"id": "admin", "name": "admin"}, "response_received": True, - "response_at": "2025-07-03T00:00:00Z", + "responded_at": "2025-07-03T00:00:00Z", }, ) return httpx.Response(status_code=400, json={"detail": "Bad Request"}) @@ -1348,4 +1348,4 @@ class TestHITLOperations: assert result.chosen_options == ["Approval"] assert result.params_input == {} assert result.responded_by_user == HITLUser(id="admin", name="admin") - assert result.response_at == timezone.datetime(2025, 7, 3, 0, 0, 0) + assert result.responded_at == timezone.datetime(2025, 7, 3, 0, 0, 0) diff --git a/task-sdk/tests/task_sdk/execution_time/test_hitl.py b/task-sdk/tests/task_sdk/execution_time/test_hitl.py index ad74ade5cd2..cd3682d922e 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_hitl.py +++ b/task-sdk/tests/task_sdk/execution_time/test_hitl.py @@ -62,7 +62,7 @@ def test_update_hitl_detail_response(mock_supervisor_comms) -> None: mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=True, chosen_options=["Approve"], - response_at=timestamp, + responded_at=timestamp, responded_by_user=APIHITLUser(id="admin", name="admin"), params_input={"input_1": 1}, ) @@ -74,7 +74,7 @@ def test_update_hitl_detail_response(mock_supervisor_comms) -> None: assert resp == HITLDetailResponse( response_received=True, chosen_options=["Approve"], - response_at=timestamp, + responded_at=timestamp, responded_by_user=APIHITLUser(id="admin", name="admin"), params_input={"input_1": 1}, ) @@ -84,7 +84,7 @@ def test_get_hitl_detail_content_detail(mock_supervisor_comms) -> None: mock_supervisor_comms.send.return_value = HITLDetailResponse( response_received=False, chosen_options=None, - response_at=None, + responded_at=None, responded_by_user=None, params_input={}, ) @@ -92,7 +92,7 @@ def test_get_hitl_detail_content_detail(mock_supervisor_comms) -> None: assert resp == HITLDetailResponse( response_received=False, chosen_options=None, - response_at=None, + responded_at=None, responded_by_user=None, params_input={}, )
