This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new e3b5e77ca05 [providers] Airbyte add submit_reset_connection func 
(#63578)
e3b5e77ca05 is described below

commit e3b5e77ca05786aeeaeedfce524303fb2ff8c16b
Author: nhuantho <[email protected]>
AuthorDate: Sat Mar 14 23:50:39 2026 +0700

    [providers] Airbyte add submit_reset_connection func (#63578)
---
 .../airbyte/src/airflow/providers/airbyte/hooks/airbyte.py | 14 ++++++++++++++
 providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py | 10 ++++++++++
 2 files changed, 24 insertions(+)

diff --git a/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py 
b/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py
index 85ba0bb2dab..4094b83d316 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py
@@ -188,6 +188,20 @@ class AirbyteHook(BaseHook):
         except Exception as e:
             raise AirflowException(e)
 
+    def submit_reset_connection(self, connection_id: str) -> Any:
+        try:
+            self.log.debug("Creating job request..")
+            res = self.airbyte_api.jobs.create_job(
+                request=JobCreateRequest(
+                    connection_id=connection_id,
+                    job_type=JobTypeEnum.RESET,
+                )
+            )
+            self.log.debug("Job request successful, response: %s", 
res.job_response)
+            return res.job_response
+        except Exception as e:
+            raise AirflowException(e)
+
     def cancel_job(self, job_id: int) -> Any:
         """
         Cancel the job when task is cancelled.
diff --git a/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py 
b/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py
index 3bcf31d6458..f75fc1cb76a 100644
--- a/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py
@@ -47,6 +47,7 @@ class TestAirbyteHook:
     health_endpoint = "http://test-airbyte:8001/api/v1/health";
     _mock_proxy = {"proxies": {"http": "http://proxy:8080";, "https": 
"https://proxy:8080"}}
     _mock_sync_conn_success_response_body = {"job": {"id": 1}}
+    _mock_reset_conn_success_response_body = {"job": {"id": 2}}
     _mock_job_status_success_response_body = {"job": {"status": "succeeded"}}
     _mock_job_cancel_status = "cancelled"
 
@@ -92,6 +93,15 @@ class TestAirbyteHook:
         resp = 
self.hook.submit_sync_connection(connection_id=self.connection_id)
         assert resp == self._mock_sync_conn_success_response_body
 
+    @mock.patch("airbyte_api.jobs.Jobs.create_job")
+    def test_submit_reset_connection(self, create_job_mock):
+        mock_response = mock.Mock()
+        mock_response.job_response = 
self._mock_reset_conn_success_response_body
+        create_job_mock.return_value = mock_response
+
+        resp = 
self.hook.submit_reset_connection(connection_id=self.connection_id)
+        assert resp == self._mock_reset_conn_success_response_body
+
     @mock.patch("airbyte_api.jobs.Jobs.get_job")
     def test_get_job_status(self, get_job_mock):
         mock_response = mock.AsyncMock()

Reply via email to