This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new c141383d5bf fix: api_version on on_kill method (#46833)
c141383d5bf is described below

commit c141383d5bf70e59d03d64490f9e68cefcdabda3
Author: Danton Bertuol <[email protected]>
AuthorDate: Thu Feb 20 15:39:11 2025 -0300

    fix: api_version on on_kill method (#46833)
---
 .../src/airflow/providers/airbyte/operators/airbyte.py  |  2 +-
 .../tests/unit/airbyte/operators/test_airbyte.py        | 17 +++++++++++++++++
 2 files changed, 18 insertions(+), 1 deletion(-)

diff --git 
a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py 
b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
index 757de753528..dd494e855ae 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/operators/airbyte.py
@@ -133,7 +133,7 @@ class AirbyteTriggerSyncOperator(BaseOperator):
 
     def on_kill(self):
         """Cancel the job if task is cancelled."""
-        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, 
api_type=self.api_type)
+        hook = AirbyteHook(airbyte_conn_id=self.airbyte_conn_id, 
api_version=self.api_version)
         if self.job_id:
             self.log.info("on_kill: cancel the airbyte Job %s", self.job_id)
             hook.cancel_job(self.job_id)
diff --git a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py 
b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
index ffbddcd7e5b..f13cf2fa30c 100644
--- a/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/operators/test_airbyte.py
@@ -69,3 +69,20 @@ class TestAirbyteTriggerSyncOp:
         mock_wait_for_job.assert_called_once_with(
             job_id=self.job_id, wait_seconds=self.wait_seconds, 
timeout=self.timeout
         )
+
+    
@mock.patch("airflow.providers.airbyte.hooks.airbyte.AirbyteHook.cancel_job")
+    def test_on_kill(self, mock_cancel_job):
+        conn = Connection(conn_id=self.airbyte_conn_id, conn_type="airbyte", 
host="airbyte.com")
+        db.merge_conn(conn)
+
+        op = AirbyteTriggerSyncOperator(
+            task_id="test_Airbyte_op",
+            airbyte_conn_id=self.airbyte_conn_id,
+            connection_id=self.connection_id,
+            wait_seconds=self.wait_seconds,
+            timeout=self.timeout,
+        )
+        op.job_id = self.job_id
+        op.on_kill()
+
+        mock_cancel_job.assert_called_once_with(self.job_id)

Reply via email to