This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 1a2784d48d9 Fix ExternalTaskSensor to use timeout parameter in 
deferrable mode (#… (#62556)
1a2784d48d9 is described below

commit 1a2784d48d91ac36d8b0c66f32bbef2465c55bca
Author: y-sudharshan <[email protected]>
AuthorDate: Thu Mar 12 06:20:00 2026 +0530

    Fix ExternalTaskSensor to use timeout parameter in deferrable mode (#… 
(#62556)
    
    * Fix ExternalTaskSensor to use timeout parameter in deferrable mode 
(#62516)
    
    ExternalTaskSensor was incorrectly using execution_timeout instead of 
timeout when deferring in deferrable mode, causing inconsistent behavior across 
poke, reschedule, and deferrable modes.
    
    Changes:
    
    - Updated execute() to use timedelta(seconds=self.timeout) with fallback to 
execution_timeout
    
    - Added comprehensive test coverage for timeout behavior in deferrable mode
    
    - Ensures consistent timeout semantics across all sensor modes
    
    * Simplify timeout logic and update tests
    
    Remove incorrect fallback to execution_timeout. Timeout parameter always 
has a default value from config, so no fallback is needed or correct.
    
    Update tests to validate correct behavior matching other sensors
    
    * Remove empty SOLUTION.md file
    
    * Fix ExternalTaskSensor timeout handling in deferrable mode
    
    - Implement fallback to execution_timeout when timeout is not set (0 or 
None)
    - Update tests to cover 3 distinct scenarios:
      1. timeout set, execution_timeout not set
      2. timeout not set (0), execution_timeout set
      3. timeout and execution_timeout both set (priority test)
    - Fix whitespace issue flagged by ruff
    
    * Retrigger CI
---
 .../providers/standard/sensors/external_task.py    |  9 ++-
 .../standard/sensors/test_external_task_sensor.py  | 71 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/providers/standard/src/airflow/providers/standard/sensors/external_task.py 
b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
index 91e7821d304..ff266fc9010 100644
--- a/providers/standard/src/airflow/providers/standard/sensors/external_task.py
+++ b/providers/standard/src/airflow/providers/standard/sensors/external_task.py
@@ -432,10 +432,15 @@ class ExternalTaskSensor(BaseSensorOperator):
         if not self.deferrable:
             super().execute(context)
         else:
+            # Determine the timeout to use: prefer timeout parameter, fallback 
to execution_timeout
+            timeout_value = self.timeout
+            if not timeout_value and self.execution_timeout:
+                timeout_value = self.execution_timeout.total_seconds()
+
             dttm_filter = self._get_dttm_filter(context)
             if AIRFLOW_V_3_0_PLUS:
                 self.defer(
-                    timeout=self.execution_timeout,
+                    timeout=datetime.timedelta(seconds=timeout_value) if 
timeout_value else None,
                     trigger=WorkflowTrigger(
                         external_dag_id=self.external_dag_id,
                         external_task_group_id=self.external_task_group_id,
@@ -453,7 +458,7 @@ class ExternalTaskSensor(BaseSensorOperator):
                 )
             else:
                 self.defer(
-                    timeout=self.execution_timeout,
+                    timeout=datetime.timedelta(seconds=timeout_value) if 
timeout_value else None,
                     trigger=WorkflowTrigger(
                         external_dag_id=self.external_dag_id,
                         external_task_group_id=self.external_task_group_id,
diff --git 
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py 
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
index b25da386dcf..f48189f07ad 100644
--- 
a/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
+++ 
b/providers/standard/tests/unit/standard/sensors/test_external_task_sensor.py
@@ -1049,6 +1049,77 @@ exit 0
         assert exc.value.trigger.external_task_ids == ["test_task"]
         assert exc.value.trigger.execution_dates == [DEFAULT_DATE]
 
+    @pytest.mark.execution_timeout(10)
+    def test_external_task_sensor_deferrable_timeout_only(self, dag_maker):
+        """Test that deferrable mode uses timeout parameter when only timeout 
is set."""
+        context = {"execution_date": DEFAULT_DATE}
+        with dag_maker() as dag:
+            op = ExternalTaskSensor(
+                task_id="test_external_task_sensor_check",
+                external_dag_id="test_dag_parent",
+                external_task_id="test_task",
+                deferrable=True,
+                timeout=60,
+            )
+            dr = dag.create_dagrun(
+                run_id="test_run",
+                run_type=DagRunType.MANUAL,
+                state=None,
+            )
+            context.update(dag_run=dr, logical_date=DEFAULT_DATE)
+
+        with pytest.raises(TaskDeferred) as exc:
+            op.execute(context=context)
+        assert exc.value.timeout == timedelta(seconds=60)
+
+    @pytest.mark.execution_timeout(10)
+    def test_external_task_sensor_deferrable_execution_timeout_only(self, 
dag_maker):
+        """Test that deferrable mode falls back to execution_timeout when 
timeout is not set."""
+        context = {"execution_date": DEFAULT_DATE}
+        with dag_maker() as dag:
+            op = ExternalTaskSensor(
+                task_id="test_external_task_sensor_check",
+                external_dag_id="test_dag_parent",
+                external_task_id="test_task",
+                deferrable=True,
+                timeout=0,  # Explicitly set to 0 to indicate not using timeout
+                execution_timeout=timedelta(seconds=120),
+            )
+            dr = dag.create_dagrun(
+                run_id="test_run",
+                run_type=DagRunType.MANUAL,
+                state=None,
+            )
+            context.update(dag_run=dr, logical_date=DEFAULT_DATE)
+
+        with pytest.raises(TaskDeferred) as exc:
+            op.execute(context=context)
+        assert exc.value.timeout == timedelta(seconds=120)
+
+    @pytest.mark.execution_timeout(10)
+    def test_external_task_sensor_deferrable_timeout_priority(self, dag_maker):
+        """Test that deferrable mode prioritizes timeout over 
execution_timeout when both are set."""
+        context = {"execution_date": DEFAULT_DATE}
+        with dag_maker() as dag:
+            op = ExternalTaskSensor(
+                task_id="test_external_task_sensor_check",
+                external_dag_id="test_dag_parent",
+                external_task_id="test_task",
+                deferrable=True,
+                timeout=90,
+                execution_timeout=timedelta(seconds=120),
+            )
+            dr = dag.create_dagrun(
+                run_id="test_run",
+                run_type=DagRunType.MANUAL,
+                state=None,
+            )
+            context.update(dag_run=dr, logical_date=DEFAULT_DATE)
+
+        with pytest.raises(TaskDeferred) as exc:
+            op.execute(context=context)
+        assert exc.value.timeout == timedelta(seconds=90)
+
     def test_get_logical_date(self):
         """For AF 2, we check for execution_date in context."""
         context = {"execution_date": DEFAULT_DATE}

Reply via email to