This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 1a2784d48d9 Fix ExternalTaskSensor to use timeout parameter in
deferrable mode (#… (#62556)
1a2784d48d9 is described below
commit 1a2784d48d91ac36d8b0c66f32bbef2465c55bca
Author: y-sudharshan <[email protected]>
AuthorDate: Thu Mar 12 06:20:00 2026 +0530
Fix ExternalTaskSensor to use timeout parameter in deferrable mode (#…
(#62556)
* Fix ExternalTaskSensor to use timeout parameter in deferrable mode
(#62516)
ExternalTaskSensor was incorrectly using execution_timeout instead of
timeout when deferring in deferrable mode, causing inconsistent behavior across
poke, reschedule, and deferrable modes.
Changes:
- Updated execute() to use timedelta(seconds=self.timeout) with fallback to
execution_timeout
- Added comprehensive test coverage for timeout behavior in deferrable mode
- Ensures consistent timeout semantics across all sensor modes
* Simplify timeout logic and update tests
Remove incorrect fallback to execution_timeout. Timeout parameter always
has a default value from config, so no fallback is needed or correct.
Update tests to validate correct behavior matching other sensors
* Remove empty SOLUTION.md file
* Fix ExternalTaskSensor timeout handling in deferrable mode
- Implement fallback to execution_timeout when timeout is not set (0 or
None)
- Update tests to cover 3 distinct scenarios:
1. timeout set, execution_timeout not set
2. timeout not set (0), execution_timeout set
3. timeout and execution_timeout both set (priority test)
- Fix whitespace issue flagged by ruff
* Retrigger CI
---
.../providers/standard/sensors/external_task.py | 9 ++-
.../standard/sensors/test_external_task_sensor.py | 71 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 2 deletions(-)
diff --git
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index 91e7821d304..ff266fc9010 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -432,10 +432,15 @@ class ExternalTaskSensor(BaseSensorOperator):
if not self.deferrable:
super().execute(context)
else:
+ # Determine the timeout to use: prefer timeout parameter, fallback
to execution_timeout
+ timeout_value = self.timeout
+ if not timeout_value and self.execution_timeout:
+ timeout_value = self.execution_timeout.total_seconds()
+
dttm_filter = self._get_dttm_filter(context)
if AIRFLOW_V_3_0_PLUS:
self.defer(
- timeout=self.execution_timeout,
+ timeout=datetime.timedelta(seconds=timeout_value) if
timeout_value else None,
trigger=WorkflowTrigger(
external_dag_id=self.external_dag_id,
external_task_group_id=self.external_task_group_id,
@@ -453,7 +458,7 @@ class ExternalTaskSensor(BaseSensorOperator):
)
else:
self.defer(
- timeout=self.execution_timeout,
+ timeout=datetime.timedelta(seconds=timeout_value) if
timeout_value else None,
trigger=WorkflowTrigger(
external_dag_id=self.external_dag_id,
external_task_group_id=self.external_task_group_id,
diff --git
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index b25da386dcf..f48189f07ad 100644
---
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -1049,6 +1049,77 @@ exit 0
assert exc.value.trigger.external_task_ids == ["test_task"]
assert exc.value.trigger.execution_dates == [DEFAULT_DATE]
+ @pytest.mark.execution_timeout(10)
+ def test_external_task_sensor_deferrable_timeout_only(self, dag_maker):
+ """Test that deferrable mode uses timeout parameter when only timeout
is set."""
+ context = {"execution_date": DEFAULT_DATE}
+ with dag_maker() as dag:
+ op = ExternalTaskSensor(
+ task_id="test_external_task_sensor_check",
+ external_dag_id="test_dag_parent",
+ external_task_id="test_task",
+ deferrable=True,
+ timeout=60,
+ )
+ dr = dag.create_dagrun(
+ run_id="test_run",
+ run_type=DagRunType.MANUAL,
+ state=None,
+ )
+ context.update(dag_run=dr, logical_date=DEFAULT_DATE)
+
+ with pytest.raises(TaskDeferred) as exc:
+ op.execute(context=context)
+ assert exc.value.timeout == timedelta(seconds=60)
+
+ @pytest.mark.execution_timeout(10)
+ def test_external_task_sensor_deferrable_execution_timeout_only(self,
dag_maker):
+ """Test that deferrable mode falls back to execution_timeout when
timeout is not set."""
+ context = {"execution_date": DEFAULT_DATE}
+ with dag_maker() as dag:
+ op = ExternalTaskSensor(
+ task_id="test_external_task_sensor_check",
+ external_dag_id="test_dag_parent",
+ external_task_id="test_task",
+ deferrable=True,
+ timeout=0, # Explicitly set to 0 to indicate not using timeout
+ execution_timeout=timedelta(seconds=120),
+ )
+ dr = dag.create_dagrun(
+ run_id="test_run",
+ run_type=DagRunType.MANUAL,
+ state=None,
+ )
+ context.update(dag_run=dr, logical_date=DEFAULT_DATE)
+
+ with pytest.raises(TaskDeferred) as exc:
+ op.execute(context=context)
+ assert exc.value.timeout == timedelta(seconds=120)
+
+ @pytest.mark.execution_timeout(10)
+ def test_external_task_sensor_deferrable_timeout_priority(self, dag_maker):
+ """Test that deferrable mode prioritizes timeout over
execution_timeout when both are set."""
+ context = {"execution_date": DEFAULT_DATE}
+ with dag_maker() as dag:
+ op = ExternalTaskSensor(
+ task_id="test_external_task_sensor_check",
+ external_dag_id="test_dag_parent",
+ external_task_id="test_task",
+ deferrable=True,
+ timeout=90,
+ execution_timeout=timedelta(seconds=120),
+ )
+ dr = dag.create_dagrun(
+ run_id="test_run",
+ run_type=DagRunType.MANUAL,
+ state=None,
+ )
+ context.update(dag_run=dr, logical_date=DEFAULT_DATE)
+
+ with pytest.raises(TaskDeferred) as exc:
+ op.execute(context=context)
+ assert exc.value.timeout == timedelta(seconds=90)
+
def test_get_logical_date(self):
"""For AF 2, we check for execution_date in context."""
context = {"execution_date": DEFAULT_DATE}