This is an automated email from the ASF dual-hosted git repository.

pankajkoti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2372e21d9d add service_file support to GKEPodAsyncHook (#37081)
2372e21d9d is described below

commit 2372e21d9dd44a9cb1f7cd20bbee7f1c37936faf
Author: Wei Lee <weilee...@gmail.com>
AuthorDate: Mon Feb 5 14:20:13 2024 +0800

    add service_file support to GKEPodAsyncHook (#37081)
    
    Currently, GKEPodAsyncHook does not support service_file. Thus, passing 
credentials through " Keyfile Path " or " Keyfile JSON " will be ignored. This 
PR intends to fix this issue. As the default value of service_file in Token is 
None 
(https://github.com/talkiq/gcloud-aio/blob/8c8e1b39ec2e40b42212c270acb98c039267fbc5/auth/gcloud/aio/auth/token.py#L157)
 and the return value of service_file_as_context when both key file path and 
key file json are not provided is None. This change won't  [...]
---
 .../google/cloud/hooks/kubernetes_engine.py        | 79 ++++++++++++----------
 .../google/cloud/hooks/test_kubernetes_engine.py   | 40 +++++++++--
 2 files changed, 76 insertions(+), 43 deletions(-)

diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py 
b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
index 72771a0c1f..7f74447827 100644
--- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py
+++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py
@@ -508,14 +508,15 @@ class GKEPodAsyncHook(GoogleBaseAsyncHook):
         :param name: Name of the pod.
         :param namespace: Name of the pod's namespace.
         """
-        async with Token(scopes=self.scopes) as token:
-            async with self.get_conn(token) as connection:
-                v1_api = async_client.CoreV1Api(connection)
-                pod: V1Pod = await v1_api.read_namespaced_pod(
-                    name=name,
-                    namespace=namespace,
-                )
-            return pod
+        async with self.service_file_as_context() as service_file:  # type: 
ignore[attr-defined]
+            async with Token(scopes=self.scopes, service_file=service_file) as 
token:
+                async with self.get_conn(token) as connection:
+                    v1_api = async_client.CoreV1Api(connection)
+                    pod: V1Pod = await v1_api.read_namespaced_pod(
+                        name=name,
+                        namespace=namespace,
+                    )
+                return pod
 
     async def delete_pod(self, name: str, namespace: str):
         """Delete a pod.
@@ -523,18 +524,21 @@ class GKEPodAsyncHook(GoogleBaseAsyncHook):
         :param name: Name of the pod.
         :param namespace: Name of the pod's namespace.
         """
-        async with Token(scopes=self.scopes) as token, self.get_conn(token) as 
connection:
-            try:
-                v1_api = async_client.CoreV1Api(connection)
-                await v1_api.delete_namespaced_pod(
-                    name=name,
-                    namespace=namespace,
-                    body=client.V1DeleteOptions(),
-                )
-            except async_client.ApiException as e:
-                # If the pod is already deleted
-                if e.status != 404:
-                    raise
+        async with self.service_file_as_context() as service_file:  # type: 
ignore[attr-defined]
+            async with Token(scopes=self.scopes, service_file=service_file) as 
token, self.get_conn(
+                token
+            ) as connection:
+                try:
+                    v1_api = async_client.CoreV1Api(connection)
+                    await v1_api.delete_namespaced_pod(
+                        name=name,
+                        namespace=namespace,
+                        body=client.V1DeleteOptions(),
+                    )
+                except async_client.ApiException as e:
+                    # If the pod is already deleted
+                    if e.status != 404:
+                        raise
 
     async def read_logs(self, name: str, namespace: str):
         """Read logs inside the pod while starting containers inside.
@@ -547,19 +551,22 @@ class GKEPodAsyncHook(GoogleBaseAsyncHook):
         :param name: Name of the pod.
         :param namespace: Name of the pod's namespace.
         """
-        async with Token(scopes=self.scopes) as token, self.get_conn(token) as 
connection:
-            try:
-                v1_api = async_client.CoreV1Api(connection)
-                logs = await v1_api.read_namespaced_pod_log(
-                    name=name,
-                    namespace=namespace,
-                    follow=False,
-                    timestamps=True,
-                )
-                logs = logs.splitlines()
-                for line in logs:
-                    self.log.info("Container logs from %s", line)
-                return logs
-            except HTTPError:
-                self.log.exception("There was an error reading the kubernetes 
API.")
-                raise
+        async with self.service_file_as_context() as service_file:  # type: 
ignore[attr-defined]
+            async with Token(scopes=self.scopes, service_file=service_file) as 
token, self.get_conn(
+                token
+            ) as connection:
+                try:
+                    v1_api = async_client.CoreV1Api(connection)
+                    logs = await v1_api.read_namespaced_pod_log(
+                        name=name,
+                        namespace=namespace,
+                        follow=False,
+                        timestamps=True,
+                    )
+                    logs = logs.splitlines()
+                    for line in logs:
+                        self.log.info("Container logs from %s", line)
+                    return logs
+                except HTTPError:
+                    self.log.exception("There was an error reading the 
kubernetes API.")
+                    raise
diff --git a/tests/providers/google/cloud/hooks/test_kubernetes_engine.py 
b/tests/providers/google/cloud/hooks/test_kubernetes_engine.py
index 954f3e018e..e52cfae870 100644
--- a/tests/providers/google/cloud/hooks/test_kubernetes_engine.py
+++ b/tests/providers/google/cloud/hooks/test_kubernetes_engine.py
@@ -317,14 +317,22 @@ class TestGKEPodAsyncHook:
         )
 
     @pytest.mark.asyncio
-    @mock.patch(GKE_STRING.format("Token"), mock.MagicMock())
+    @pytest.mark.parametrize("mock_service_file", ("/tmp/service_file.json", 
None))
+    @mock.patch(GKE_STRING.format("Token"))
     @mock.patch(GKE_STRING.format("GKEPodAsyncHook.get_conn"))
     
@mock.patch(GKE_STRING.format("async_client.CoreV1Api.read_namespaced_pod"))
-    async def test_get_pod(self, read_namespace_pod_mock, get_conn_mock, 
async_hook):
+    async def test_get_pod(
+        self, read_namespace_pod_mock, get_conn_mock, mock_token, async_hook, 
mock_service_file
+    ):
+        async_hook.service_file_as_context = mock.MagicMock()
+        
async_hook.service_file_as_context.return_value.__aenter__.return_value = 
mock_service_file
+
         self.make_mock_awaitable(read_namespace_pod_mock)
 
         await async_hook.get_pod(name=POD_NAME, namespace=POD_NAMESPACE)
-
+        mock_token.assert_called_with(
+            scopes=["https://www.googleapis.com/auth/cloud-platform";], 
service_file=mock_service_file
+        )
         get_conn_mock.assert_called_once()
         read_namespace_pod_mock.assert_called_with(
             name=POD_NAME,
@@ -332,14 +340,23 @@ class TestGKEPodAsyncHook:
         )
 
     @pytest.mark.asyncio
-    @mock.patch(GKE_STRING.format("Token"), mock.MagicMock())
+    @pytest.mark.parametrize("mock_service_file", ("/tmp/service_file.json", 
None))
+    @mock.patch(GKE_STRING.format("Token"))
     @mock.patch(GKE_STRING.format("GKEPodAsyncHook.get_conn"))
     
@mock.patch(GKE_STRING.format("async_client.CoreV1Api.delete_namespaced_pod"))
-    async def test_delete_pod(self, delete_namespaced_pod, get_conn_mock, 
async_hook):
+    async def test_delete_pod(
+        self, delete_namespaced_pod, get_conn_mock, mock_token, async_hook, 
mock_service_file
+    ):
+        async_hook.service_file_as_context = mock.MagicMock()
+        
async_hook.service_file_as_context.return_value.__aenter__.return_value = 
mock_service_file
+
         self.make_mock_awaitable(delete_namespaced_pod)
 
         await async_hook.delete_pod(name=POD_NAME, namespace=POD_NAMESPACE)
 
+        mock_token.assert_called_with(
+            scopes=["https://www.googleapis.com/auth/cloud-platform";], 
service_file=mock_service_file
+        )
         get_conn_mock.assert_called_once()
         delete_namespaced_pod.assert_called_with(
             name=POD_NAME,
@@ -348,14 +365,23 @@ class TestGKEPodAsyncHook:
         )
 
     @pytest.mark.asyncio
-    @mock.patch(GKE_STRING.format("Token"), mock.MagicMock())
+    @pytest.mark.parametrize("mock_service_file", ("/tmp/service_file.json", 
None))
+    @mock.patch(GKE_STRING.format("Token"))
     @mock.patch(GKE_STRING.format("GKEPodAsyncHook.get_conn"))
     
@mock.patch(GKE_STRING.format("async_client.CoreV1Api.read_namespaced_pod_log"))
-    async def test_read_logs(self, read_namespaced_pod_log, get_conn_mock, 
async_hook, caplog):
+    async def test_read_logs(
+        self, read_namespaced_pod_log, get_conn_mock, mock_token, async_hook, 
mock_service_file, caplog
+    ):
+        async_hook.service_file_as_context = mock.MagicMock()
+        
async_hook.service_file_as_context.return_value.__aenter__.return_value = 
mock_service_file
+
         self.make_mock_awaitable(read_namespaced_pod_log, result="Test string 
#1\nTest string #2\n")
 
         await async_hook.read_logs(name=POD_NAME, namespace=POD_NAMESPACE)
 
+        mock_token.assert_called_with(
+            scopes=["https://www.googleapis.com/auth/cloud-platform";], 
service_file=mock_service_file
+        )
         get_conn_mock.assert_called_once()
         read_namespaced_pod_log.assert_called_with(
             name=POD_NAME,

Reply via email to