This is an automated email from the ASF dual-hosted git repository.

weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new fc29492e7ed refactor(hitl): remove unnecessary helper functions and 
reduce one call through joinedload (#55260)
fc29492e7ed is described below

commit fc29492e7ed814ad0340c76f106089d9b3efa886
Author: Wei Lee <[email protected]>
AuthorDate: Fri Sep 5 12:08:01 2025 +0800

    refactor(hitl): remove unnecessary helper functions and reduce one call 
through joinedload (#55260)
---
 .../api_fastapi/core_api/routes/public/hitl.py     | 120 +++++++--------------
 1 file changed, 37 insertions(+), 83 deletions(-)

diff --git 
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py 
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
index d36070a1a4a..e04714e0a91 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
@@ -61,17 +61,21 @@ hitl_router = AirflowRouter(tags=["HumanInTheLoop"], 
prefix="/hitlDetails")
 log = structlog.get_logger(__name__)
 
 
-def _get_task_instance(
+def _get_task_instance_with_hitl_detail(
     dag_id: str,
     dag_run_id: str,
     task_id: str,
     session: SessionDep,
     map_index: int,
 ) -> TI:
-    query = select(TI).where(
-        TI.dag_id == dag_id,
-        TI.run_id == dag_run_id,
-        TI.task_id == task_id,
+    query = (
+        select(TI)
+        .where(
+            TI.dag_id == dag_id,
+            TI.run_id == dag_run_id,
+            TI.task_id == task_id,
+        )
+        .options(joinedload(TI.hitl_detail))
     )
 
     if map_index is not None:
@@ -87,38 +91,53 @@ def _get_task_instance(
             ),
         )
 
+    if not task_instance.hitl_detail:
+        raise HTTPException(
+            status_code=status.HTTP_404_NOT_FOUND,
+            detail=f"Human-in-the-loop detail does not exist for Task Instance 
with id {task_instance.id}",
+        )
+
     return task_instance
 
 
-def _update_hitl_detail(
+@hitl_router.patch(
+    "/{dag_id}/{dag_run_id}/{task_id}",
+    responses=create_openapi_http_exception_doc(
+        [
+            status.HTTP_403_FORBIDDEN,
+            status.HTTP_404_NOT_FOUND,
+            status.HTTP_409_CONFLICT,
+        ]
+    ),
+    dependencies=[
+        Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.HITL_DETAIL)),
+        Depends(action_logging()),
+    ],
+)
+def update_hitl_detail(
     dag_id: str,
     dag_run_id: str,
     task_id: str,
     update_hitl_detail_payload: UpdateHITLDetailPayload,
     user: GetUserDep,
     session: SessionDep,
-    map_index: int,
+    map_index: int = -1,
 ) -> HITLDetailResponse:
-    task_instance = _get_task_instance(
+    """Update a Human-in-the-loop detail."""
+    task_instance = _get_task_instance_with_hitl_detail(
         dag_id=dag_id,
         dag_run_id=dag_run_id,
         task_id=task_id,
         session=session,
         map_index=map_index,
     )
-    ti_id_str = str(task_instance.id)
-    hitl_detail_model = 
session.scalar(select(HITLDetailModel).where(HITLDetailModel.ti_id == 
ti_id_str))
-    if not hitl_detail_model:
-        raise HTTPException(
-            status_code=status.HTTP_404_NOT_FOUND,
-            detail=f"Human-in-the-loop detail does not exist for Task Instance 
with id {ti_id_str}",
-        )
 
+    hitl_detail_model = task_instance.hitl_detail
     if hitl_detail_model.response_received:
         raise HTTPException(
             status_code=status.HTTP_409_CONFLICT,
             detail=(
-                f"Human-in-the-loop detail has already been updated for Task 
Instance with id {ti_id_str} "
+                f"Human-in-the-loop detail has already been updated for Task 
Instance with id {task_instance.id} "
                 "and is not allowed to write again."
             ),
         )
@@ -146,72 +165,6 @@ def _update_hitl_detail(
     return HITLDetailResponse.model_validate(hitl_detail_model)
 
 
-def _get_hitl_detail(
-    dag_id: str,
-    dag_run_id: str,
-    task_id: str,
-    session: SessionDep,
-    map_index: int,
-) -> HITLDetail:
-    """Get a Human-in-the-loop detail of a specific task instance."""
-    task_instance = _get_task_instance(
-        dag_id=dag_id,
-        dag_run_id=dag_run_id,
-        task_id=task_id,
-        session=session,
-        map_index=map_index,
-    )
-
-    ti_id_str = str(task_instance.id)
-    hitl_detail_model = session.scalar(
-        select(HITLDetailModel)
-        .where(HITLDetailModel.ti_id == ti_id_str)
-        .options(joinedload(HITLDetailModel.task_instance))
-    )
-    if not hitl_detail_model:
-        log.error("Human-in-the-loop detail does not exist for Task Instance 
with id %s", ti_id_str)
-        raise HTTPException(
-            status_code=status.HTTP_404_NOT_FOUND,
-            detail=f"Human-in-the-loop detail does not exist for Task Instance 
with id {ti_id_str}",
-        )
-    return HITLDetail.model_validate(hitl_detail_model)
-
-
-@hitl_router.patch(
-    "/{dag_id}/{dag_run_id}/{task_id}",
-    responses=create_openapi_http_exception_doc(
-        [
-            status.HTTP_403_FORBIDDEN,
-            status.HTTP_404_NOT_FOUND,
-            status.HTTP_409_CONFLICT,
-        ]
-    ),
-    dependencies=[
-        Depends(requires_access_dag(method="PUT", 
access_entity=DagAccessEntity.HITL_DETAIL)),
-        Depends(action_logging()),
-    ],
-)
-def update_hitl_detail(
-    dag_id: str,
-    dag_run_id: str,
-    task_id: str,
-    update_hitl_detail_payload: UpdateHITLDetailPayload,
-    user: GetUserDep,
-    session: SessionDep,
-    map_index: int = -1,
-) -> HITLDetailResponse:
-    """Update a Human-in-the-loop detail."""
-    return _update_hitl_detail(
-        dag_id=dag_id,
-        dag_run_id=dag_run_id,
-        task_id=task_id,
-        session=session,
-        update_hitl_detail_payload=update_hitl_detail_payload,
-        user=user,
-        map_index=map_index,
-    )
-
-
 @hitl_router.get(
     "/{dag_id}/{dag_run_id}/{task_id}",
     status_code=status.HTTP_200_OK,
@@ -226,13 +179,14 @@ def get_hitl_detail(
     map_index: int = -1,
 ) -> HITLDetail:
     """Get a Human-in-the-loop detail of a specific task instance."""
-    return _get_hitl_detail(
+    task_instance = _get_task_instance_with_hitl_detail(
         dag_id=dag_id,
         dag_run_id=dag_run_id,
         task_id=task_id,
         session=session,
         map_index=map_index,
     )
+    return task_instance.hitl_detail
 
 
 @hitl_router.get(

Reply via email to