This is an automated email from the ASF dual-hosted git repository. onikolas pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new ac977c4e57 Make EksPodOperator exec config not rely on log level (#35771) ac977c4e57 is described below commit ac977c4e5740041911c72c145d50545b64ff6f78 Author: Syed Hussain <103602455+syeda...@users.noreply.github.com> AuthorDate: Wed Nov 22 14:13:41 2023 -0800 Make EksPodOperator exec config not rely on log level (#35771) * Wrap bash inline script around eks_get_token.py to suppress log output --- airflow/providers/amazon/aws/hooks/eks.py | 55 ++++++++++++++-------- .../providers/amazon/aws/utils/eks_get_token.py | 9 +--- tests/providers/amazon/aws/hooks/test_eks.py | 42 ++++++++++------- .../amazon/aws/utils/test_eks_get_token.py | 19 +++----- 4 files changed, 66 insertions(+), 59 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/eks.py b/airflow/providers/amazon/aws/hooks/eks.py index f6e3ed83f8..24a22c6a2b 100644 --- a/airflow/providers/amazon/aws/hooks/eks.py +++ b/airflow/providers/amazon/aws/hooks/eks.py @@ -75,6 +75,25 @@ class NodegroupStates(Enum): NONEXISTENT = "NONEXISTENT" +COMMAND = """ + output=$({python_executable} -m airflow.providers.amazon.aws.utils.eks_get_token \ + --cluster-name {eks_cluster_name} {args} 2>&1) + + if [ $? -ne 0 ]; then + echo "Error running the script" + exit 1 + fi + + expiration_timestamp=$(echo "$output" | grep -oP 'expirationTimestamp:\s*\K[^,]+') + token=$(echo "$output" | grep -oP 'token:\s*\K[^,]+') + + json_string=$(printf '{{"kind": "ExecCredential","apiVersion": \ + "client.authentication.k8s.io/v1alpha1","spec": {{}},"status": \ + {{"expirationTimestamp": "%s","token": "%s"}}}}' "$expiration_timestamp" "$token") + echo $json_string + """ + + class EksHook(AwsBaseHook): """ Interact with Amazon Elastic Kubernetes Service (EKS). @@ -521,6 +540,16 @@ class EksHook(AwsBaseHook): :param eks_cluster_name: The name of the cluster to generate kubeconfig file for. :param pod_namespace: The namespace to run within kubernetes. """ + args = "" + if self.region_name is not None: + args = args + f" --region-name {self.region_name}" + + if self.aws_conn_id is not None: + args = args + f" --aws-conn-id {self.aws_conn_id}" + + # We need to determine which python executable the host is running in order to correctly + # call the eks_get_token.py script. + python_executable = f"python{sys.version_info[0]}.{sys.version_info[1]}" # Set up the client eks_client = self.conn @@ -556,28 +585,14 @@ class EksHook(AwsBaseHook): "user": { "exec": { "apiVersion": AUTHENTICATION_API_VERSION, - "command": sys.executable, + "command": "sh", "args": [ - "-m", - "airflow.providers.amazon.aws.utils.eks_get_token", - *( - ["--region-name", self.region_name] - if self.region_name is not None - else [] + "-c", + COMMAND.format( + python_executable=python_executable, + eks_cluster_name=eks_cluster_name, + args=args, ), - *( - ["--aws-conn-id", self.aws_conn_id] - if self.aws_conn_id is not None - else [] - ), - "--cluster-name", - eks_cluster_name, - ], - "env": [ - { - "name": "AIRFLOW__LOGGING__LOGGING_LEVEL", - "value": "FATAL", - } ], "interactiveMode": "Never", } diff --git a/airflow/providers/amazon/aws/utils/eks_get_token.py b/airflow/providers/amazon/aws/utils/eks_get_token.py index 27bf51676f..9a340671ef 100644 --- a/airflow/providers/amazon/aws/utils/eks_get_token.py +++ b/airflow/providers/amazon/aws/utils/eks_get_token.py @@ -17,7 +17,6 @@ from __future__ import annotations import argparse -import json from datetime import datetime, timedelta, timezone from airflow.providers.amazon.aws.hooks.eks import EksHook @@ -58,13 +57,7 @@ def main(): eks_hook = EksHook(aws_conn_id=args.aws_conn_id, region_name=args.region_name) access_token = eks_hook.fetch_access_token_for_cluster(args.cluster_name) access_token_expiration = get_expiration_time() - exec_credential_object = { - "kind": "ExecCredential", - "apiVersion": "client.authentication.k8s.io/v1alpha1", - "spec": {}, - "status": {"expirationTimestamp": access_token_expiration, "token": access_token}, - } - print(json.dumps(exec_credential_object)) + print(f"expirationTimestamp: {access_token_expiration}, token: {access_token}") if __name__ == "__main__": diff --git a/tests/providers/amazon/aws/hooks/test_eks.py b/tests/providers/amazon/aws/hooks/test_eks.py index 35cb246091..e35c0eba1f 100644 --- a/tests/providers/amazon/aws/hooks/test_eks.py +++ b/tests/providers/amazon/aws/hooks/test_eks.py @@ -52,7 +52,7 @@ from moto.eks.models import ( NODEGROUP_NOT_FOUND_MSG, ) -from airflow.providers.amazon.aws.hooks.eks import EksHook +from airflow.providers.amazon.aws.hooks.eks import COMMAND, EksHook from ..utils.eks_test_constants import ( DEFAULT_CONN_ID, @@ -1198,6 +1198,8 @@ class TestEksHooks: class TestEksHook: + python_executable = f"python{sys.version_info[0]}.{sys.version_info[1]}" + @mock.patch("airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook.conn") @pytest.mark.parametrize( "aws_conn_id, region_name, expected_args", @@ -1206,32 +1208,37 @@ class TestEksHook: "test-id", "test-region", [ - "-m", - "airflow.providers.amazon.aws.utils.eks_get_token", - "--region-name", - "test-region", - "--aws-conn-id", - "test-id", - "--cluster-name", - "test-cluster", + "-c", + COMMAND.format( + python_executable=python_executable, + eks_cluster_name="test-cluster", + args=" --region-name test-region --aws-conn-id test-id", + ), ], ], [ None, "test-region", [ - "-m", - "airflow.providers.amazon.aws.utils.eks_get_token", - "--region-name", - "test-region", - "--cluster-name", - "test-cluster", + "-c", + COMMAND.format( + python_executable=python_executable, + eks_cluster_name="test-cluster", + args=" --region-name test-region", + ), ], ], [ None, None, - ["-m", "airflow.providers.amazon.aws.utils.eks_get_token", "--cluster-name", "test-cluster"], + [ + "-c", + COMMAND.format( + python_executable=python_executable, + eks_cluster_name="test-cluster", + args="", + ), + ], ], ], ) @@ -1271,8 +1278,7 @@ class TestEksHook: "exec": { "apiVersion": "client.authentication.k8s.io/v1alpha1", "args": expected_args, - "command": sys.executable, - "env": [{"name": "AIRFLOW__LOGGING__LOGGING_LEVEL", "value": "FATAL"}], + "command": "sh", "interactiveMode": "Never", } }, diff --git a/tests/providers/amazon/aws/utils/test_eks_get_token.py b/tests/providers/amazon/aws/utils/test_eks_get_token.py index c8d66de0db..8825c6c218 100644 --- a/tests/providers/amazon/aws/utils/test_eks_get_token.py +++ b/tests/providers/amazon/aws/utils/test_eks_get_token.py @@ -17,12 +17,10 @@ from __future__ import annotations import contextlib -import json import os import runpy from io import StringIO from unittest import mock -from unittest.mock import ANY import pytest import time_machine @@ -76,17 +74,12 @@ class TestGetEksToken: os.chdir(AIRFLOW_MAIN_FOLDER) # We are not using run_module because of https://github.com/pytest-dev/pytest/issues/9007 runpy.run_path("airflow/providers/amazon/aws/utils/eks_get_token.py", run_name="__main__") - json_output = json.loads(temp_stdout.getvalue()) - assert { - "apiVersion": "client.authentication.k8s.io/v1alpha1", - "kind": "ExecCredential", - "spec": {}, - "status": { - "expirationTimestamp": ANY, # depending on local timezone, this can be different - "token": "k8s-aws-v1.aHR0cDovL2V4YW1wbGUuY29t", - }, - } == json_output - assert json_output["status"]["expirationTimestamp"].startswith("1995-02-") + output = temp_stdout.getvalue() + token = "token: k8s-aws-v1.aHR0cDovL2V4YW1wbGUuY29t" + expected_token = output.split(",")[1].strip() + expected_expiration_timestamp = output.split(",")[0].split(":")[1].strip() + assert expected_token == token + assert expected_expiration_timestamp.startswith("1995-02-") mock_eks_hook.assert_called_once_with( aws_conn_id=expected_aws_conn_id, region_name=expected_region_name )