This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new e3b5e77ca05 [providers] Airbyte add submit_reset_connection func
(#63578)
e3b5e77ca05 is described below
commit e3b5e77ca05786aeeaeedfce524303fb2ff8c16b
Author: nhuantho <[email protected]>
AuthorDate: Sat Mar 14 23:50:39 2026 +0700
[providers] Airbyte add submit_reset_connection func (#63578)
---
.../airbyte/src/airflow/providers/airbyte/hooks/airbyte.py | 14 ++++++++++++++
providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py | 10 ++++++++++
2 files changed, 24 insertions(+)
diff --git a/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py
b/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py
index 85ba0bb2dab..4094b83d316 100644
--- a/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py
+++ b/providers/airbyte/src/airflow/providers/airbyte/hooks/airbyte.py
@@ -188,6 +188,20 @@ class AirbyteHook(BaseHook):
except Exception as e:
raise AirflowException(e)
+ def submit_reset_connection(self, connection_id: str) -> Any:
+ try:
+ self.log.debug("Creating job request..")
+ res = self.airbyte_api.jobs.create_job(
+ request=JobCreateRequest(
+ connection_id=connection_id,
+ job_type=JobTypeEnum.RESET,
+ )
+ )
+ self.log.debug("Job request successful, response: %s",
res.job_response)
+ return res.job_response
+ except Exception as e:
+ raise AirflowException(e)
+
def cancel_job(self, job_id: int) -> Any:
"""
Cancel the job when task is cancelled.
diff --git a/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py
b/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py
index 3bcf31d6458..f75fc1cb76a 100644
--- a/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py
+++ b/providers/airbyte/tests/unit/airbyte/hooks/test_airbyte.py
@@ -47,6 +47,7 @@ class TestAirbyteHook:
health_endpoint = "http://test-airbyte:8001/api/v1/health"
_mock_proxy = {"proxies": {"http": "http://proxy:8080", "https":
"https://proxy:8080"}}
_mock_sync_conn_success_response_body = {"job": {"id": 1}}
+ _mock_reset_conn_success_response_body = {"job": {"id": 2}}
_mock_job_status_success_response_body = {"job": {"status": "succeeded"}}
_mock_job_cancel_status = "cancelled"
@@ -92,6 +93,15 @@ class TestAirbyteHook:
resp =
self.hook.submit_sync_connection(connection_id=self.connection_id)
assert resp == self._mock_sync_conn_success_response_body
+ @mock.patch("airbyte_api.jobs.Jobs.create_job")
+ def test_submit_reset_connection(self, create_job_mock):
+ mock_response = mock.Mock()
+ mock_response.job_response =
self._mock_reset_conn_success_response_body
+ create_job_mock.return_value = mock_response
+
+ resp =
self.hook.submit_reset_connection(connection_id=self.connection_id)
+ assert resp == self._mock_reset_conn_success_response_body
+
@mock.patch("airbyte_api.jobs.Jobs.get_job")
def test_get_job_status(self, get_job_mock):
mock_response = mock.AsyncMock()