This is an automated email from the ASF dual-hosted git repository.
weilee pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new fc29492e7ed refactor(hitl): remove unnecessary helper functions and
reduce one call through joinedload (#55260)
fc29492e7ed is described below
commit fc29492e7ed814ad0340c76f106089d9b3efa886
Author: Wei Lee <[email protected]>
AuthorDate: Fri Sep 5 12:08:01 2025 +0800
refactor(hitl): remove unnecessary helper functions and reduce one call
through joinedload (#55260)
---
.../api_fastapi/core_api/routes/public/hitl.py | 120 +++++++--------------
1 file changed, 37 insertions(+), 83 deletions(-)
diff --git
a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
index d36070a1a4a..e04714e0a91 100644
--- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
+++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py
@@ -61,17 +61,21 @@ hitl_router = AirflowRouter(tags=["HumanInTheLoop"],
prefix="/hitlDetails")
log = structlog.get_logger(__name__)
-def _get_task_instance(
+def _get_task_instance_with_hitl_detail(
dag_id: str,
dag_run_id: str,
task_id: str,
session: SessionDep,
map_index: int,
) -> TI:
- query = select(TI).where(
- TI.dag_id == dag_id,
- TI.run_id == dag_run_id,
- TI.task_id == task_id,
+ query = (
+ select(TI)
+ .where(
+ TI.dag_id == dag_id,
+ TI.run_id == dag_run_id,
+ TI.task_id == task_id,
+ )
+ .options(joinedload(TI.hitl_detail))
)
if map_index is not None:
@@ -87,38 +91,53 @@ def _get_task_instance(
),
)
+ if not task_instance.hitl_detail:
+ raise HTTPException(
+ status_code=status.HTTP_404_NOT_FOUND,
+ detail=f"Human-in-the-loop detail does not exist for Task Instance
with id {task_instance.id}",
+ )
+
return task_instance
-def _update_hitl_detail(
+@hitl_router.patch(
+ "/{dag_id}/{dag_run_id}/{task_id}",
+ responses=create_openapi_http_exception_doc(
+ [
+ status.HTTP_403_FORBIDDEN,
+ status.HTTP_404_NOT_FOUND,
+ status.HTTP_409_CONFLICT,
+ ]
+ ),
+ dependencies=[
+ Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.HITL_DETAIL)),
+ Depends(action_logging()),
+ ],
+)
+def update_hitl_detail(
dag_id: str,
dag_run_id: str,
task_id: str,
update_hitl_detail_payload: UpdateHITLDetailPayload,
user: GetUserDep,
session: SessionDep,
- map_index: int,
+ map_index: int = -1,
) -> HITLDetailResponse:
- task_instance = _get_task_instance(
+ """Update a Human-in-the-loop detail."""
+ task_instance = _get_task_instance_with_hitl_detail(
dag_id=dag_id,
dag_run_id=dag_run_id,
task_id=task_id,
session=session,
map_index=map_index,
)
- ti_id_str = str(task_instance.id)
- hitl_detail_model =
session.scalar(select(HITLDetailModel).where(HITLDetailModel.ti_id ==
ti_id_str))
- if not hitl_detail_model:
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Human-in-the-loop detail does not exist for Task Instance
with id {ti_id_str}",
- )
+ hitl_detail_model = task_instance.hitl_detail
if hitl_detail_model.response_received:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=(
- f"Human-in-the-loop detail has already been updated for Task
Instance with id {ti_id_str} "
+ f"Human-in-the-loop detail has already been updated for Task
Instance with id {task_instance.id} "
"and is not allowed to write again."
),
)
@@ -146,72 +165,6 @@ def _update_hitl_detail(
return HITLDetailResponse.model_validate(hitl_detail_model)
-def _get_hitl_detail(
- dag_id: str,
- dag_run_id: str,
- task_id: str,
- session: SessionDep,
- map_index: int,
-) -> HITLDetail:
- """Get a Human-in-the-loop detail of a specific task instance."""
- task_instance = _get_task_instance(
- dag_id=dag_id,
- dag_run_id=dag_run_id,
- task_id=task_id,
- session=session,
- map_index=map_index,
- )
-
- ti_id_str = str(task_instance.id)
- hitl_detail_model = session.scalar(
- select(HITLDetailModel)
- .where(HITLDetailModel.ti_id == ti_id_str)
- .options(joinedload(HITLDetailModel.task_instance))
- )
- if not hitl_detail_model:
- log.error("Human-in-the-loop detail does not exist for Task Instance
with id %s", ti_id_str)
- raise HTTPException(
- status_code=status.HTTP_404_NOT_FOUND,
- detail=f"Human-in-the-loop detail does not exist for Task Instance
with id {ti_id_str}",
- )
- return HITLDetail.model_validate(hitl_detail_model)
-
-
-@hitl_router.patch(
- "/{dag_id}/{dag_run_id}/{task_id}",
- responses=create_openapi_http_exception_doc(
- [
- status.HTTP_403_FORBIDDEN,
- status.HTTP_404_NOT_FOUND,
- status.HTTP_409_CONFLICT,
- ]
- ),
- dependencies=[
- Depends(requires_access_dag(method="PUT",
access_entity=DagAccessEntity.HITL_DETAIL)),
- Depends(action_logging()),
- ],
-)
-def update_hitl_detail(
- dag_id: str,
- dag_run_id: str,
- task_id: str,
- update_hitl_detail_payload: UpdateHITLDetailPayload,
- user: GetUserDep,
- session: SessionDep,
- map_index: int = -1,
-) -> HITLDetailResponse:
- """Update a Human-in-the-loop detail."""
- return _update_hitl_detail(
- dag_id=dag_id,
- dag_run_id=dag_run_id,
- task_id=task_id,
- session=session,
- update_hitl_detail_payload=update_hitl_detail_payload,
- user=user,
- map_index=map_index,
- )
-
-
@hitl_router.get(
"/{dag_id}/{dag_run_id}/{task_id}",
status_code=status.HTTP_200_OK,
@@ -226,13 +179,14 @@ def get_hitl_detail(
map_index: int = -1,
) -> HITLDetail:
"""Get a Human-in-the-loop detail of a specific task instance."""
- return _get_hitl_detail(
+ task_instance = _get_task_instance_with_hitl_detail(
dag_id=dag_id,
dag_run_id=dag_run_id,
task_id=task_id,
session=session,
map_index=map_index,
)
+ return task_instance.hitl_detail
@hitl_router.get(