This is an automated email from the ASF dual-hosted git repository.

jscheffl pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 57e8a7a6a19 Fix deferrable execution_timeout handling in 
AirbyteTriggerSyncOperator (#67382)
57e8a7a6a19 is described below

commit 57e8a7a6a1979ff9d82e7c6a8c62f048027d2d8f
Author: SameerMesiah97 <[email protected]>
AuthorDate: Mon May 25 16:00:41 2026 +0100

    Fix deferrable execution_timeout handling in AirbyteTriggerSyncOperator 
(#67382)
    
    Prevent framework-level deferred timeouts from bypassing
    execute_complete() cancellation handling for Airbyte jobs.
    
    Co-authored-by: Sameer Mesiah <[email protected]>
---
 .../airflow/providers/airbyte/operators/airbyte.py |  2 +-
 .../tests/unit/airbyte/operators/test_airbyte.py   | 55 ++++++++++++++++++++++
 2 files changed, 56 insertions(+), 1 deletion(-)

diff --git 
a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py 
b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
index 7f8f80a1b0b..ae4e19e125f 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
@@ -111,7 +111,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
             self.log.debug("Running in deferrable mode in job state %s...", 
state)
             if state in (JobStatusEnum.RUNNING, JobStatusEnum.PENDING, 
JobStatusEnum.INCOMPLETE):
                 self.defer(
-                    timeout=self.execution_timeout,
+                    timeout=None,
                     trigger=AirbyteSyncTrigger(
                         conn_id=self.airbyte_conn_id,
                         job_id=self.job_id,
diff --git a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py 
b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
index 7cb792e960d..b9165dca06f 100644
--- a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
@@ -17,6 +17,7 @@
 # under the License.
 from __future__ import annotations
 
+from datetime import timedelta
 from unittest import mock
 
 import pytest
@@ -69,6 +70,60 @@ class TestAirbyteTriggerSyncOp:
             job_id=self.job_id, wait_seconds=self.wait_seconds, 
timeout=self.timeout
         )
 
+    
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteTriggerSyncOperator.defer")
+    
@mock.patch("airflow.providers.airbyte.operators.airbyte.AirbyteSyncTrigger")
+    @mock.patch("airbyte_api.jobs.Jobs.create_job")
+    def test_execute_deferrable_does_not_pass_execution_timeout_to_defer(
+        self,
+        mock_create_job,
+        mock_airbyte_trigger,
+        mock_defer,
+        create_connection_without_db,
+    ):
+        conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", 
host="airbyte.com")
+        create_connection_without_db(conn)
+
+        mock_response = mock.Mock()
+        mock_response.job_response = JobResponse(
+            connection_id="connection-mock",
+            job_id=1,
+            start_time="today",
+            job_type=JobTypeEnum.SYNC,
+            status=JobStatusEnum.RUNNING,
+        )
+        mock_create_job.return_value = mock_response
+
+        op = AirbyteTriggerSyncOperator(
+            task_id="test_airbyte_op",
+            airbyte_conn_id=self.airbyte_conn_id,
+            connection_id=self.connection_id,
+            wait_seconds=self.wait_seconds,
+            timeout=self.timeout,
+            deferrable=True,
+            execution_timeout=timedelta(seconds=30),
+        )
+
+        op.execute({})
+
+        # Explicitly pass timeout=None so Airflow's framework-level deferred
+        # timeout handling does not bypass execute_complete(), which is
+        # responsible for Airbyte job cancellation in deferrable mode.
+        mock_defer.assert_called_once_with(
+            method_name="execute_complete",
+            trigger=mock_airbyte_trigger.return_value,
+            timeout=None,
+        )
+
+        # Ensure the trigger still receives execution_deadline handling for
+        # Airbyte job timeout cancellation processing.
+        mock_airbyte_trigger.assert_called_once_with(
+            conn_id=self.airbyte_conn_id,
+            job_id=self.job_id,
+            end_time=mock.ANY,
+            execution_deadline=mock.ANY,
+            poll_interval=60,
+        )
+
     @pytest.mark.parametrize(
         ("status", "should_raise", "expected_message"),
         [

Reply via email to