This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch v3-1-test
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/v3-1-test by this push:
     new 6254fa6a8b7 [v3-1-test] Fix deferrable sensors not respecting 
soft_fail on timeout (#61132) (#61421)
6254fa6a8b7 is described below

commit 6254fa6a8b77957e0123a905dd6d90bcac1dd659
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Feb 18 21:32:34 2026 +0100

    [v3-1-test] Fix deferrable sensors not respecting soft_fail on timeout 
(#61132) (#61421)
    
    When a deferrable sensor with soft_fail=True times out, the task
    fails with AirflowSensorTimeout instead of being marked as SKIPPED.
    This is a regression from Airflow 2.x behavior.
    
    The issue was in resume_execution() where TaskDeferralTimeout was
    converted to AirflowSensorTimeout before checking soft_fail. This
    fix uses nested exception handling to check soft_fail and never_fail
    before the conversion, ensuring timeouts are properly skipped.
    
    closes: #61130
    (cherry picked from commit cec8ba68f19cb21293979a6f042909d5ccbaac11)
    
    Co-authored-by: Nathan Hadfield <[email protected]>
    Co-authored-by: Claude Sonnet 4.5 <[email protected]>
---
 task-sdk/src/airflow/sdk/bases/sensor.py     | 13 +++++--
 task-sdk/tests/task_sdk/bases/test_sensor.py | 57 ++++++++++++++++++++++++++++
 2 files changed, 66 insertions(+), 4 deletions(-)

diff --git a/task-sdk/src/airflow/sdk/bases/sensor.py 
b/task-sdk/src/airflow/sdk/bases/sensor.py
index 0ca447189a9..66a97c10f0e 100644
--- a/task-sdk/src/airflow/sdk/bases/sensor.py
+++ b/task-sdk/src/airflow/sdk/bases/sensor.py
@@ -251,13 +251,18 @@ class BaseSensorOperator(BaseOperator):
         return xcom_value
 
     def resume_execution(self, next_method: str, next_kwargs: dict[str, Any] | 
None, context: Context):
+        # Use nested try/except to convert TaskDeferralTimeout to 
AirflowSensorTimeout
+        # while still allowing soft_fail/never_fail to handle both exception 
types.
         try:
-            return super().resume_execution(next_method, next_kwargs, context)
-        except TaskDeferralTimeout as e:
-            raise AirflowSensorTimeout(*e.args) from e
+            try:
+                return super().resume_execution(next_method, next_kwargs, 
context)
+            except TaskDeferralTimeout as e:
+                raise AirflowSensorTimeout(*e.args) from e
         except (AirflowException, TaskDeferralError) as e:
             if self.soft_fail:
-                raise AirflowSkipException(str(e)) from e
+                raise AirflowSkipException("Skipping due to soft_fail is set 
to True.") from e
+            if self.never_fail:
+                raise AirflowSkipException("Skipping due to never_fail is set 
to True.") from e
             raise
 
     def _get_next_poke_interval(
diff --git a/task-sdk/tests/task_sdk/bases/test_sensor.py 
b/task-sdk/tests/task_sdk/bases/test_sensor.py
index 8b9cc04a786..b37cdac3b80 100644
--- a/task-sdk/tests/task_sdk/bases/test_sensor.py
+++ b/task-sdk/tests/task_sdk/bases/test_sensor.py
@@ -31,6 +31,7 @@ from airflow.exceptions import (
     AirflowSensorTimeout,
     AirflowSkipException,
     AirflowTaskTimeout,
+    TaskDeferralError,
 )
 from airflow.models.trigger import TriggerFailureReason
 from airflow.providers.standard.operators.empty import EmptyOperator
@@ -677,3 +678,59 @@ class TestAsyncSensor:
         async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", 
soft_fail=soft_fail)
         with pytest.raises(expected_exception):
             async_sensor.resume_execution("execute_complete", None, {})
+
+    @pytest.mark.parametrize(
+        ("soft_fail", "expected_exception"),
+        [
+            (True, AirflowSkipException),
+            (False, AirflowSensorTimeout),
+        ],
+    )
+    def test_timeout_after_resuming_deferred_sensor_with_soft_fail(self, 
soft_fail, expected_exception):
+        """Test that deferrable sensors with soft_fail skip on timeout instead 
of failing."""
+        async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", 
soft_fail=soft_fail)
+        with pytest.raises(expected_exception):
+            async_sensor.resume_execution(
+                next_method="__fail__",
+                next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT},
+                context={},
+            )
+
+    def test_timeout_after_resuming_deferred_sensor_with_never_fail(self):
+        """Test that deferrable sensors with never_fail skip on timeout."""
+        async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", 
never_fail=True)
+        with pytest.raises(AirflowSkipException):
+            async_sensor.resume_execution(
+                next_method="__fail__",
+                next_kwargs={"error": TriggerFailureReason.TRIGGER_TIMEOUT},
+                context={},
+            )
+
+    @pytest.mark.parametrize(
+        ("soft_fail", "expected_exception"),
+        [
+            (True, AirflowSkipException),
+            (False, TaskDeferralError),
+        ],
+    )
+    def test_trigger_failure_after_resuming_deferred_sensor_with_soft_fail(
+        self, soft_fail, expected_exception
+    ):
+        """Test that deferrable sensors with soft_fail skip on trigger failure 
instead of failing."""
+        async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", 
soft_fail=soft_fail)
+        with pytest.raises(expected_exception):
+            async_sensor.resume_execution(
+                next_method="__fail__",
+                next_kwargs={"error": TriggerFailureReason.TRIGGER_FAILURE},
+                context={},
+            )
+
+    def 
test_trigger_failure_after_resuming_deferred_sensor_with_never_fail(self):
+        """Test that deferrable sensors with never_fail skip on trigger 
failure."""
+        async_sensor = DummyAsyncSensor(task_id="dummy_async_sensor", 
never_fail=True)
+        with pytest.raises(AirflowSkipException):
+            async_sensor.resume_execution(
+                next_method="__fail__",
+                next_kwargs={"error": TriggerFailureReason.TRIGGER_FAILURE},
+                context={},
+            )

Reply via email to