QiaoLiar opened a new issue, #45368: URL: https://github.com/apache/airflow/issues/45368
### Apache Airflow Provider(s) amazon ### Versions of Apache Airflow Providers 9.2.0 Even though I'm using the latest version, I think this bug exists in all historical versions. ### Apache Airflow version 2.10.1 ### Operating System Amazon Linux 2023 ### Deployment Amazon (AWS) MWAA ### Deployment details You can reproduce this bug stably without any customization. ### What happened Their dag script to connect to EKS cluster using [from airflow.providers.amazon.aws.operators.eks import EksPodOperator] returned 401 . script: ``` from airflow import DAG from airflow.operators.dummy import DummyOperator from airflow.providers.amazon.aws.operators.eks import EksPodOperator from kubernetes.client import models as k8s from datetime import datetime DEFAULT_ARGS = { 'owner': 'XXX', } with DAG( 'test2_eks_pod_operator_poc', default_args=DEFAULT_ARGS, schedule_interval=None, # trigger manually for now start_date=datetime(2024, 4, 28), catchup=False, tags=['examples'] ) as dag: start = DummyOperator(task_id='start', retries=2) end = DummyOperator(task_id='end', retries=2) test2_eks_pod_operator = EksPodOperator( task_id='test2_eks_pod_operator', region='cn-north-1', cluster_name='eks2-sas_cnnorth1', namespace='mwaa', service_account_name='default', pod_name='eks_pod_operator_poc', image='amazon/aws-cli:latest', image_pull_policy='IfNotPresent', node_selector={ 'type': 'app' }, tolerations=[ k8s.V1Toleration( effect='NoSchedule', key='type', operator='Equal', value='app' ) ], cmds=['/bin/bash', '-c'], arguments=['echo "hello world"'], is_delete_operator_pod=True, ) start >> test2_eks_pod_operator >> end ``` error log: ``` *** Reading remote log from Cloudwatch log_group: airflow-mwaa-sas-cnnorth1-Task log_stream: dag_id=test2_eks_pod_operator_poc/run_id=manual__2024-12-31T08_47_12.225701+00_00/task_id=test2_eks_pod_operator/attempt=1.log. [2024-12-31, 08:47:15 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs [2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 'aws_default' [2024-12-31, 08:47:16 UTC] {baseoperator.py:405} WARNING - EksPodOperator.execute cannot be called outside TaskInstance! [2024-12-31, 08:47:16 UTC] {pod.py:1133} INFO - Building pod eks-pod-operator-poc-ylyc6uh3 with labels: {'dag_id': 'test2_eks_pod_operator_poc', 'task_id': 'test2_eks_pod_operator', 'run_id': 'manual__2024-12-31T084712.2257010000-a8af56277', 'kubernetes_pod_operator': 'True', 'try_number': '1'} [2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 'kubernetes_default' [2024-12-31, 08:47:19 UTC] {taskinstance.py:3310} ERROR - Task failed with exception Traceback (most recent call last): File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task result = _execute_callable(context=context, **execute_callable_kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable return ExecutionCallableRunner( ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run return self.func(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/operators/eks.py", line 1103, in execute return super().execute(context) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper return func(self, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 593, in execute return self.execute_sync(context) ^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 603, in execute_sync self.pod = self.get_or_create_pod( # must set `self.pod` for `on_kill` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 561, in get_or_create_pod pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 534, in find_pod pod_list = self.client.list_namespaced_pod( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15823, in list_namespaced_pod return self.list_namespaced_pod_with_http_info(namespace, **kwargs) # noqa: E501 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15942, in list_namespaced_pod_with_http_info return self.api_client.call_api( ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api return self.__call_api(resource_path, method, ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api response_data = self.request( ^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 373, in request return self.rest_client.GET(url, ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 244, in GET return self.request("GET", url, ^^^^^^^^^^^^^^^^^^^^^^^^ File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 238, in request raise ApiException(http_resp=r) kubernetes.client.exceptions.ApiException: (401) Reason: Unauthorized HTTP response headers: HTTPHeaderDict({'Audit-Id': '6bab071a-ed5b-41b9-9df7-d76d7247ebcd', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 31 Dec 2024 08:47:19 GMT', 'Content-Length': '129'}) HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401} [2024-12-31, 08:47:19 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=test2_eks_pod_operator_poc, task_id=test2_eks_pod_operator, run_id=manual__2024-12-31T08:47:12.225701+00:00, execution_date=20241231T084712, start_date=20241231T084715, end_date=20241231T084719 [2024-12-31, 08:47:19 UTC] {taskinstance.py:340} ▶ Post task execution logs ``` ### What you think should happen instead ### Here's the investigation I've done: #### Examining the source code for EksPodOperator shows that this class automatically generates a kube_config file if no external kube_config file is specified during initialization: ``` https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/operators/eks.html # There is no need to manage the kube_config file, as it will be generated automatically. # All Kubernetes parameters (except config_file) are also valid for the EksPodOperator. ``` #### There seems to be a problem with this auto-generated kube_config file, so I printed the contents of the file in debug and examined the source code associated with it generating the contents of the file: ``` [docs] def execute(self, context: Context): eks_hook = EksHook( aws_conn_id=self.aws_conn_id, region_name=self.region, ) with eks_hook.generate_config_file( eks_cluster_name=self.cluster_name, pod_namespace=self.namespace ) as self.config_file: return super().execute(context) ``` #### https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html ``` cluster_config = { "apiVersion": "v1", "kind": "Config", "clusters": [ { "cluster": {"server": cluster_ep, "certificate-authority-data": cluster_cert}, "name": eks_cluster_name, } ], "contexts": [ { "context": { "cluster": eks_cluster_name, "namespace": pod_namespace, "user": _POD_USERNAME, }, "name": _CONTEXT_NAME, } ], "current-context": _CONTEXT_NAME, "preferences": {}, "users": [ { "name": _POD_USERNAME, "user": { "exec": { "apiVersion": AUTHENTICATION_API_VERSION, "command": "sh", "args": [ "-c", COMMAND.format( python_executable=python_executable, eks_cluster_name=eks_cluster_name, args=args, ), ], "interactiveMode": "Never", } }, } ], } ``` #### Here it is executing a bash command, searching the COMMAND variable you can see the exact command executed as follows, you can see it is getting the eks token. ``` COMMAND = """ output=$({python_executable} -m airflow.providers.amazon.aws.utils.eks_get_token \ --cluster-name {eks_cluster_name} {args} 2>&1) if [ $? -ne 0 ]; then echo "Error running the script" exit 1 fi expiration_timestamp=$(echo "$output" | grep -oP 'expirationTimestamp: \\K[^,]+') token=$(echo "$output" | grep -oP 'token: \\K[^,]+') json_string=$(printf '{{"kind": "ExecCredential","apiVersion": \ "client.authentication.k8s.io/v1alpha1","spec": {{}},"status": \ {{"expirationTimestamp": "%s","token": "%s"}}}}' "$expiration_timestamp" "$token") echo $json_string """ ``` #### https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/utils/eks_get_token.html ``` [docs]def main(): parser = get_parser() args = parser.parse_args() eks_hook = EksHook(aws_conn_id=args.aws_conn_id, region_name=args.region_name) access_token = eks_hook.fetch_access_token_for_cluster(args.cluster_name) access_token_expiration = get_expiration_time() print(f"expirationTimestamp: {access_token_expiration}, token: {access_token}") ``` #### access_token from eks_hook.fetch_access_token_for_cluster(args.cluster_name) Check out the implementation of the eks_hook.fetch_access_token_for_cluster method: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html ``` def fetch_access_token_for_cluster(self, eks_cluster_name: str) -> str: session = self.get_session() service_id = self.conn.meta.service_model.service_id sts_url = ( f"https://sts.{session.region_name}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15" ) ``` ### The address to access STS here points to the global address, not the China STS service address. So the eks token obtained cannot be used in China. The sts_url that should be used in China is f “https://sts.{session.region_name}.amazonaws.com.cn/?Action=GetCallerIdentity&Version=2011-06-15” ### How to reproduce You can easily reproduce this using the dag script above, provided you use the identity credentials of your China AWS account. ### Anything else no ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org