This is an automated email from the ASF dual-hosted git repository.

husseinawala pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new ae46b9e721c fix: kpo async kube_config_path (#45571)
ae46b9e721c is described below

commit ae46b9e721ceb1e9d6a8536abbd501a3aeda6244
Author: raphaelauv <[email protected]>
AuthorDate: Fri Feb 7 14:50:15 2025 +0100

    fix: kpo async kube_config_path (#45571)
---
 .../providers/cncf/kubernetes/hooks/kubernetes.py  | 16 ++++++++--
 .../cncf/kubernetes/hooks/test_kubernetes.py       | 35 ++++++++++++++++++++++
 2 files changed, 49 insertions(+), 2 deletions(-)

diff --git 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
index 114a933e50e..0c9b655bd1f 100644
--- 
a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
+++ 
b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py
@@ -734,9 +734,11 @@ class AsyncKubernetesHook(KubernetesHook):
         """Return Kubernetes API session for use with requests."""
         in_cluster = self._coalesce_param(self.in_cluster, await 
self._get_field("in_cluster"))
         cluster_context = self._coalesce_param(self.cluster_context, await 
self._get_field("cluster_context"))
+        kubeconfig_path = await self._get_field("kube_config_path")
         kubeconfig = await self._get_field("kube_config")
-
-        num_selected_configuration = sum(1 for o in [in_cluster, kubeconfig, 
self.config_dict] if o)
+        num_selected_configuration = sum(
+            1 for o in [in_cluster, kubeconfig, kubeconfig_path, 
self.config_dict] if o
+        )
 
         if num_selected_configuration > 1:
             raise AirflowException(
@@ -757,6 +759,16 @@ class AsyncKubernetesHook(KubernetesHook):
             await async_config.load_kube_config_from_dict(self.config_dict)
             return async_client.ApiClient()
 
+        if kubeconfig_path is not None:
+            self.log.debug("loading kube_config from: %s", kubeconfig_path)
+            self._is_in_cluster = False
+            await async_config.load_kube_config(
+                config_file=kubeconfig_path,
+                client_configuration=self.client_configuration,
+                context=cluster_context,
+            )
+            return async_client.ApiClient()
+
         if kubeconfig is not None:
             async with aiofiles.tempfile.NamedTemporaryFile() as temp_config:
                 self.log.debug(
diff --git 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/hooks/test_kubernetes.py
 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/hooks/test_kubernetes.py
index ae9514b5b8e..97d00a5c3b2 100644
--- 
a/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/hooks/test_kubernetes.py
+++ 
b/providers/cncf/kubernetes/tests/provider_tests/cncf/kubernetes/hooks/test_kubernetes.py
@@ -24,8 +24,10 @@ from asyncio import Future
 from unittest import mock
 from unittest.mock import MagicMock, PropertyMock, patch
 
+import anyio
 import kubernetes
 import pytest
+import yaml
 from kubernetes.client import V1Deployment, V1DeploymentStatus
 from kubernetes.client.rest import ApiException
 from kubernetes.config import ConfigException
@@ -898,6 +900,39 @@ class TestAsyncKubernetesHook:
         kube_config_loader.assert_called_once()
         kube_config_merger.assert_called_once()
 
+    @pytest.mark.asyncio
+    @mock.patch(INCLUSTER_CONFIG_LOADER)
+    @mock.patch(KUBE_CONFIG_MERGER)
+    async def test_load_config_with_conn_id_kube_config_path(
+        self, kube_config_merger, incluster_config, kube_config_loader, 
tmp_path
+    ):
+        file_name = f"{tmp_path}/config"
+        extra = {"kube_config_path": file_name}
+        try:
+            merge_conn(
+                Connection(
+                    conn_type="kubernetes",
+                    conn_id=CONN_ID,
+                    extra=json.dumps(extra),
+                ),
+            )
+            async with await anyio.open_file(file_name, "w+") as f:
+                yaml.dump({"a": "b"}, f)
+            hook = AsyncKubernetesHook(
+                conn_id=CONN_ID,
+                in_cluster=False,
+                config_file=None,
+                cluster_context=None,
+            )
+            await hook._load_config()
+            assert not incluster_config.called
+            kube_config_loader.assert_called_once()
+            kube_config_merger.assert_called_once()
+        except:
+            raise
+        finally:
+            clear_db_connections()
+
     @pytest.mark.asyncio
     @mock.patch(INCLUSTER_CONFIG_LOADER)
     @mock.patch(KUBE_CONFIG_MERGER)

Reply via email to