This is an automated email from the ASF dual-hosted git repository. rahulvats pushed a commit to branch py-client-sync in repository https://gitbox.apache.org/repos/asf/airflow.git
commit 99a3981f799cc1955648a30197892bbae9572ca6 Author: Antonio Mello <[email protected]> AuthorDate: Mon Mar 23 22:29:21 2026 -0300 fix(providers/standard): add response_timeout to HITLOperator to prevent race with execution_timeout (#63475) Co-authored-by: Claude Opus 4.6 <[email protected]> --- .../standard/example_dags/example_hitl_operator.py | 4 +-- .../airflow/providers/standard/operators/hitl.py | 27 +++++++++++++++-- .../tests/unit/standard/operators/test_hitl.py | 35 +++++++++++++++++++--- 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py index 96404a0ebc1..dc26aa15708 100644 --- a/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py +++ b/providers/standard/src/airflow/providers/standard/example_dags/example_hitl_operator.py @@ -118,7 +118,7 @@ with DAG( subject="Please choose option to proceed: ", options=["option 7", "option 8", "option 9"], defaults=["option 7"], - execution_timeout=datetime.timedelta(seconds=1), + response_timeout=datetime.timedelta(seconds=1), notifiers=[hitl_request_callback], on_success_callback=hitl_success_callback, on_failure_callback=hitl_failure_callback, @@ -136,7 +136,7 @@ with DAG( Timeout Option: {{ ti.xcom_pull(task_ids='wait_for_default_option')["chosen_options"] }} """, defaults="Reject", - execution_timeout=datetime.timedelta(minutes=5), + response_timeout=datetime.timedelta(minutes=5), notifiers=[hitl_request_callback], on_success_callback=hitl_success_callback, on_failure_callback=hitl_failure_callback, diff --git a/providers/standard/src/airflow/providers/standard/operators/hitl.py b/providers/standard/src/airflow/providers/standard/operators/hitl.py index b009422be8d..d3199a7b0a1 100644 --- a/providers/standard/src/airflow/providers/standard/operators/hitl.py +++ b/providers/standard/src/airflow/providers/standard/operators/hitl.py @@ -17,7 +17,9 @@ from __future__ import annotations import logging +import warnings +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException from airflow.providers.standard.version_compat import AIRFLOW_V_3_1_3_PLUS, AIRFLOW_V_3_1_PLUS @@ -25,6 +27,7 @@ if not AIRFLOW_V_3_1_PLUS: raise AirflowOptionalProviderFeatureException("Human in the loop functionality needs Airflow 3.1+.") from collections.abc import Collection, Mapping, Sequence +from datetime import timedelta from typing import TYPE_CHECKING, Any from urllib.parse import ParseResult, urlencode, urlparse, urlunparse @@ -55,6 +58,9 @@ class HITLOperator(BaseOperator): :param params: dictionary of parameter definitions that are in the format of Dag params such that a Form Field can be rendered. Entered data is validated (schema, required fields) like for a Dag run and added to XCom of the task result. + :param response_timeout: Maximum time to wait for a human response after deferring to the trigger. + This is separate from ``execution_timeout`` which controls the pre-defer execution phase. + If not set, no timeout is applied to the human response wait. """ template_fields: Collection[str] = ("subject", "body") @@ -70,9 +76,26 @@ class HITLOperator(BaseOperator): params: ParamsDict | dict[str, Any] | None = None, notifiers: Sequence[BaseNotifier] | BaseNotifier | None = None, assigned_users: HITLUser | list[HITLUser] | None = None, + response_timeout: timedelta | None = None, **kwargs, ) -> None: super().__init__(**kwargs) + + # Handle backward compatibility: if execution_timeout is set but response_timeout is not, + # migrate execution_timeout to response_timeout and clear it to prevent the BaseOperator + # timeout from racing the defer() call. + if self.execution_timeout and not response_timeout: + warnings.warn( + "Passing `execution_timeout` to HITLOperator to control the human response wait is " + "deprecated. Use `response_timeout` instead. `execution_timeout` will be cleared to " + "prevent it from killing the task before defer() is reached.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + response_timeout = self.execution_timeout + self.execution_timeout = None + + self.response_timeout = response_timeout self.subject = subject self.body = body @@ -160,8 +183,8 @@ class HITLOperator(BaseOperator): assigned_users=self.assigned_users, ) - if self.execution_timeout: - timeout_datetime = utcnow() + self.execution_timeout + if self.response_timeout: + timeout_datetime = utcnow() + self.response_timeout else: timeout_datetime = None diff --git a/providers/standard/tests/unit/standard/operators/test_hitl.py b/providers/standard/tests/unit/standard/operators/test_hitl.py index 6a71c3d1d94..e1ebae5ab00 100644 --- a/providers/standard/tests/unit/standard/operators/test_hitl.py +++ b/providers/standard/tests/unit/standard/operators/test_hitl.py @@ -32,6 +32,7 @@ from urllib.parse import parse_qs, urlparse from sqlalchemy import select +from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.models import TaskInstance, Trigger from airflow.models.hitl import HITLDetail from airflow.providers.common.compat.sdk import AirflowException, DownstreamTasksSkipped, ParamValidationError @@ -1028,13 +1029,13 @@ class TestHITLSummaryForListeners: "serialized_params": None, } - def test_execute_enriches_summary_with_timeout(self) -> None: - """execute() adds timeout_datetime; all other init keys remain.""" + def test_execute_enriches_summary_with_response_timeout(self) -> None: + """execute() adds timeout_datetime using response_timeout; all other init keys remain.""" op = HITLOperator( task_id="test", subject="Review", options=["OK"], - execution_timeout=datetime.timedelta(minutes=10), + response_timeout=datetime.timedelta(minutes=10), ) with ( @@ -1084,6 +1085,32 @@ class TestHITLSummaryForListeners: "timeout_datetime": None, } + def test_execution_timeout_deprecated_and_migrated(self) -> None: + """execution_timeout is migrated to response_timeout with a deprecation warning.""" + with pytest.warns(AirflowProviderDeprecationWarning, match="Use `response_timeout` instead"): + op = HITLOperator( + task_id="test", + subject="Review", + options=["OK"], + execution_timeout=datetime.timedelta(minutes=10), + ) + + assert op.response_timeout == datetime.timedelta(minutes=10) + assert op.execution_timeout is None + + def test_response_timeout_does_not_clear_execution_timeout(self) -> None: + """When response_timeout is set, execution_timeout is left untouched.""" + op = HITLOperator( + task_id="test", + subject="Review", + options=["OK"], + response_timeout=datetime.timedelta(minutes=5), + execution_timeout=datetime.timedelta(minutes=30), + ) + + assert op.response_timeout == datetime.timedelta(minutes=5) + assert op.execution_timeout == datetime.timedelta(minutes=30) + def test_hitl_operator_execute_complete_enriches_summary(self) -> None: """execute_complete() adds response fields directly into hitl_summary.""" op = HITLOperator( @@ -1257,7 +1284,7 @@ class TestHITLSummaryForListeners: task_id="test", subject="Release v2.0?", body="Please approve the production deployment.", - execution_timeout=datetime.timedelta(minutes=30), + response_timeout=datetime.timedelta(minutes=30), ) # -- After __init__: only base + approval keys --
