This is an automated email from the ASF dual-hosted git repository. pankajkoti pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 2372e21d9d add service_file support to GKEPodAsyncHook (#37081) 2372e21d9d is described below commit 2372e21d9dd44a9cb1f7cd20bbee7f1c37936faf Author: Wei Lee <weilee...@gmail.com> AuthorDate: Mon Feb 5 14:20:13 2024 +0800 add service_file support to GKEPodAsyncHook (#37081) Currently, GKEPodAsyncHook does not support service_file. Thus, passing credentials through " Keyfile Path " or " Keyfile JSON " will be ignored. This PR intends to fix this issue. As the default value of service_file in Token is None (https://github.com/talkiq/gcloud-aio/blob/8c8e1b39ec2e40b42212c270acb98c039267fbc5/auth/gcloud/aio/auth/token.py#L157) and the return value of service_file_as_context when both key file path and key file json are not provided is None. This change won't [...] --- .../google/cloud/hooks/kubernetes_engine.py | 79 ++++++++++++---------- .../google/cloud/hooks/test_kubernetes_engine.py | 40 +++++++++-- 2 files changed, 76 insertions(+), 43 deletions(-) diff --git a/airflow/providers/google/cloud/hooks/kubernetes_engine.py b/airflow/providers/google/cloud/hooks/kubernetes_engine.py index 72771a0c1f..7f74447827 100644 --- a/airflow/providers/google/cloud/hooks/kubernetes_engine.py +++ b/airflow/providers/google/cloud/hooks/kubernetes_engine.py @@ -508,14 +508,15 @@ class GKEPodAsyncHook(GoogleBaseAsyncHook): :param name: Name of the pod. :param namespace: Name of the pod's namespace. """ - async with Token(scopes=self.scopes) as token: - async with self.get_conn(token) as connection: - v1_api = async_client.CoreV1Api(connection) - pod: V1Pod = await v1_api.read_namespaced_pod( - name=name, - namespace=namespace, - ) - return pod + async with self.service_file_as_context() as service_file: # type: ignore[attr-defined] + async with Token(scopes=self.scopes, service_file=service_file) as token: + async with self.get_conn(token) as connection: + v1_api = async_client.CoreV1Api(connection) + pod: V1Pod = await v1_api.read_namespaced_pod( + name=name, + namespace=namespace, + ) + return pod async def delete_pod(self, name: str, namespace: str): """Delete a pod. @@ -523,18 +524,21 @@ class GKEPodAsyncHook(GoogleBaseAsyncHook): :param name: Name of the pod. :param namespace: Name of the pod's namespace. """ - async with Token(scopes=self.scopes) as token, self.get_conn(token) as connection: - try: - v1_api = async_client.CoreV1Api(connection) - await v1_api.delete_namespaced_pod( - name=name, - namespace=namespace, - body=client.V1DeleteOptions(), - ) - except async_client.ApiException as e: - # If the pod is already deleted - if e.status != 404: - raise + async with self.service_file_as_context() as service_file: # type: ignore[attr-defined] + async with Token(scopes=self.scopes, service_file=service_file) as token, self.get_conn( + token + ) as connection: + try: + v1_api = async_client.CoreV1Api(connection) + await v1_api.delete_namespaced_pod( + name=name, + namespace=namespace, + body=client.V1DeleteOptions(), + ) + except async_client.ApiException as e: + # If the pod is already deleted + if e.status != 404: + raise async def read_logs(self, name: str, namespace: str): """Read logs inside the pod while starting containers inside. @@ -547,19 +551,22 @@ class GKEPodAsyncHook(GoogleBaseAsyncHook): :param name: Name of the pod. :param namespace: Name of the pod's namespace. """ - async with Token(scopes=self.scopes) as token, self.get_conn(token) as connection: - try: - v1_api = async_client.CoreV1Api(connection) - logs = await v1_api.read_namespaced_pod_log( - name=name, - namespace=namespace, - follow=False, - timestamps=True, - ) - logs = logs.splitlines() - for line in logs: - self.log.info("Container logs from %s", line) - return logs - except HTTPError: - self.log.exception("There was an error reading the kubernetes API.") - raise + async with self.service_file_as_context() as service_file: # type: ignore[attr-defined] + async with Token(scopes=self.scopes, service_file=service_file) as token, self.get_conn( + token + ) as connection: + try: + v1_api = async_client.CoreV1Api(connection) + logs = await v1_api.read_namespaced_pod_log( + name=name, + namespace=namespace, + follow=False, + timestamps=True, + ) + logs = logs.splitlines() + for line in logs: + self.log.info("Container logs from %s", line) + return logs + except HTTPError: + self.log.exception("There was an error reading the kubernetes API.") + raise diff --git a/tests/providers/google/cloud/hooks/test_kubernetes_engine.py b/tests/providers/google/cloud/hooks/test_kubernetes_engine.py index 954f3e018e..e52cfae870 100644 --- a/tests/providers/google/cloud/hooks/test_kubernetes_engine.py +++ b/tests/providers/google/cloud/hooks/test_kubernetes_engine.py @@ -317,14 +317,22 @@ class TestGKEPodAsyncHook: ) @pytest.mark.asyncio - @mock.patch(GKE_STRING.format("Token"), mock.MagicMock()) + @pytest.mark.parametrize("mock_service_file", ("/tmp/service_file.json", None)) + @mock.patch(GKE_STRING.format("Token")) @mock.patch(GKE_STRING.format("GKEPodAsyncHook.get_conn")) @mock.patch(GKE_STRING.format("async_client.CoreV1Api.read_namespaced_pod")) - async def test_get_pod(self, read_namespace_pod_mock, get_conn_mock, async_hook): + async def test_get_pod( + self, read_namespace_pod_mock, get_conn_mock, mock_token, async_hook, mock_service_file + ): + async_hook.service_file_as_context = mock.MagicMock() + async_hook.service_file_as_context.return_value.__aenter__.return_value = mock_service_file + self.make_mock_awaitable(read_namespace_pod_mock) await async_hook.get_pod(name=POD_NAME, namespace=POD_NAMESPACE) - + mock_token.assert_called_with( + scopes=["https://www.googleapis.com/auth/cloud-platform"], service_file=mock_service_file + ) get_conn_mock.assert_called_once() read_namespace_pod_mock.assert_called_with( name=POD_NAME, @@ -332,14 +340,23 @@ class TestGKEPodAsyncHook: ) @pytest.mark.asyncio - @mock.patch(GKE_STRING.format("Token"), mock.MagicMock()) + @pytest.mark.parametrize("mock_service_file", ("/tmp/service_file.json", None)) + @mock.patch(GKE_STRING.format("Token")) @mock.patch(GKE_STRING.format("GKEPodAsyncHook.get_conn")) @mock.patch(GKE_STRING.format("async_client.CoreV1Api.delete_namespaced_pod")) - async def test_delete_pod(self, delete_namespaced_pod, get_conn_mock, async_hook): + async def test_delete_pod( + self, delete_namespaced_pod, get_conn_mock, mock_token, async_hook, mock_service_file + ): + async_hook.service_file_as_context = mock.MagicMock() + async_hook.service_file_as_context.return_value.__aenter__.return_value = mock_service_file + self.make_mock_awaitable(delete_namespaced_pod) await async_hook.delete_pod(name=POD_NAME, namespace=POD_NAMESPACE) + mock_token.assert_called_with( + scopes=["https://www.googleapis.com/auth/cloud-platform"], service_file=mock_service_file + ) get_conn_mock.assert_called_once() delete_namespaced_pod.assert_called_with( name=POD_NAME, @@ -348,14 +365,23 @@ class TestGKEPodAsyncHook: ) @pytest.mark.asyncio - @mock.patch(GKE_STRING.format("Token"), mock.MagicMock()) + @pytest.mark.parametrize("mock_service_file", ("/tmp/service_file.json", None)) + @mock.patch(GKE_STRING.format("Token")) @mock.patch(GKE_STRING.format("GKEPodAsyncHook.get_conn")) @mock.patch(GKE_STRING.format("async_client.CoreV1Api.read_namespaced_pod_log")) - async def test_read_logs(self, read_namespaced_pod_log, get_conn_mock, async_hook, caplog): + async def test_read_logs( + self, read_namespaced_pod_log, get_conn_mock, mock_token, async_hook, mock_service_file, caplog + ): + async_hook.service_file_as_context = mock.MagicMock() + async_hook.service_file_as_context.return_value.__aenter__.return_value = mock_service_file + self.make_mock_awaitable(read_namespaced_pod_log, result="Test string #1\nTest string #2\n") await async_hook.read_logs(name=POD_NAME, namespace=POD_NAMESPACE) + mock_token.assert_called_with( + scopes=["https://www.googleapis.com/auth/cloud-platform"], service_file=mock_service_file + ) get_conn_mock.assert_called_once() read_namespaced_pod_log.assert_called_with( name=POD_NAME,