QiaoLiar opened a new issue, #45368:
URL: https://github.com/apache/airflow/issues/45368

   ### Apache Airflow Provider(s)
   
   amazon
   
   ### Versions of Apache Airflow Providers
   
   9.2.0
   
   Even though I'm using the latest version, I think this bug exists in all 
historical versions.
   
   ### Apache Airflow version
   
   2.10.1
   
   ### Operating System
   
   Amazon Linux 2023
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   You can reproduce this bug stably without any customization.
   
   ### What happened
   
   Their dag script to connect to EKS cluster using [from 
airflow.providers.amazon.aws.operators.eks import EksPodOperator] returned 401 .
   
   script:
   ```
   from airflow import DAG
   from airflow.operators.dummy import DummyOperator
   from airflow.providers.amazon.aws.operators.eks import EksPodOperator
   from kubernetes.client import models as k8s
   
   from datetime import datetime
   
   DEFAULT_ARGS = {
       'owner': 'XXX',
   }
   
   
   with DAG(
           'test2_eks_pod_operator_poc',
           default_args=DEFAULT_ARGS,
           schedule_interval=None,  # trigger manually for now
           start_date=datetime(2024, 4, 28),
           catchup=False,
           tags=['examples']
   ) as dag:
   
       start = DummyOperator(task_id='start', retries=2)
       end = DummyOperator(task_id='end', retries=2)
   
       test2_eks_pod_operator = EksPodOperator(
           task_id='test2_eks_pod_operator',
           region='cn-north-1',
           cluster_name='eks2-sas_cnnorth1',
           namespace='mwaa',
           service_account_name='default',
           pod_name='eks_pod_operator_poc',
           image='amazon/aws-cli:latest',
           image_pull_policy='IfNotPresent',
           node_selector={
               'type': 'app'
           },
           tolerations=[
               k8s.V1Toleration(
                   effect='NoSchedule',
                   key='type',
                   operator='Equal',
                   value='app'
               )
           ],
           cmds=['/bin/bash', '-c'],
           arguments=['echo "hello world"'],
           is_delete_operator_pod=True,
       )
   
       start >> test2_eks_pod_operator >> end
   ```
   
   error log:
   ```
   *** Reading remote log from Cloudwatch log_group: 
airflow-mwaa-sas-cnnorth1-Task log_stream: 
dag_id=test2_eks_pod_operator_poc/run_id=manual__2024-12-31T08_47_12.225701+00_00/task_id=test2_eks_pod_operator/attempt=1.log.
   [2024-12-31, 08:47:15 UTC] {local_task_job_runner.py:123} ▶ Pre task 
execution logs
   [2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 
'aws_default'
   [2024-12-31, 08:47:16 UTC] {baseoperator.py:405} WARNING - 
EksPodOperator.execute cannot be called outside TaskInstance!
   [2024-12-31, 08:47:16 UTC] {pod.py:1133} INFO - Building pod 
eks-pod-operator-poc-ylyc6uh3 with labels: {'dag_id': 
'test2_eks_pod_operator_poc', 'task_id': 'test2_eks_pod_operator', 'run_id': 
'manual__2024-12-31T084712.2257010000-a8af56277', 'kubernetes_pod_operator': 
'True', 'try_number': '1'}
   [2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 
'kubernetes_default'
   [2024-12-31, 08:47:19 UTC] {taskinstance.py:3310} ERROR - Task failed with 
exception
   Traceback (most recent call last):
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 767, in _execute_task
       result = _execute_callable(context=context, **execute_callable_kwargs)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py",
 line 733, in _execute_callable
       return ExecutionCallableRunner(
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py",
 line 252, in run
       return self.func(*args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 406, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/operators/eks.py",
 line 1103, in execute
       return super().execute(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py",
 line 406, in wrapper
       return func(self, *args, **kwargs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 593, in execute
       return self.execute_sync(context)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 603, in execute_sync
       self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 561, in get_or_create_pod
       pod = self.find_pod(self.namespace or 
pod_request_obj.metadata.namespace, context=context)
             
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py",
 line 534, in find_pod
       pod_list = self.client.list_namespaced_pod(
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py",
 line 15823, in list_namespaced_pod
       return self.list_namespaced_pod_with_http_info(namespace, **kwargs)  # 
noqa: E501
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py",
 line 15942, in list_namespaced_pod_with_http_info
       return self.api_client.call_api(
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py",
 line 348, in call_api
       return self.__call_api(resource_path, method,
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py",
 line 180, in __call_api
       response_data = self.request(
                       ^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py",
 line 373, in request
       return self.rest_client.GET(url,
              ^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py",
 line 244, in GET
       return self.request("GET", url,
              ^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py",
 line 238, in request
       raise ApiException(http_resp=r)
   kubernetes.client.exceptions.ApiException: (401)
   Reason: Unauthorized
   HTTP response headers: HTTPHeaderDict({'Audit-Id': 
'6bab071a-ed5b-41b9-9df7-d76d7247ebcd', 'Cache-Control': 'no-cache, private', 
'Content-Type': 'application/json', 'Date': 'Tue, 31 Dec 2024 08:47:19 GMT', 
'Content-Length': '129'})
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
   
   
   [2024-12-31, 08:47:19 UTC] {taskinstance.py:1225} INFO - Marking task as 
FAILED. dag_id=test2_eks_pod_operator_poc, task_id=test2_eks_pod_operator, 
run_id=manual__2024-12-31T08:47:12.225701+00:00, 
execution_date=20241231T084712, start_date=20241231T084715, 
end_date=20241231T084719
   [2024-12-31, 08:47:19 UTC] {taskinstance.py:340} ▶ Post task execution logs
   ```
   
   ### What you think should happen instead
   
   ### Here's the investigation I've done:
   
   #### Examining the source code for EksPodOperator shows that this class 
automatically generates a kube_config file if no external kube_config file is 
specified during initialization:
   ```
   
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/operators/eks.html
           # There is no need to manage the kube_config file, as it will be 
generated automatically.
           # All Kubernetes parameters (except config_file) are also valid for 
the EksPodOperator.
   ```
   #### There seems to be a problem with this auto-generated kube_config file, 
so I printed the contents of the file in debug and examined the source code 
associated with it generating the contents of the file:
   ```
   [docs]    def execute(self, context: Context):
           eks_hook = EksHook(
               aws_conn_id=self.aws_conn_id,
               region_name=self.region,
           )
           with eks_hook.generate_config_file(
               eks_cluster_name=self.cluster_name, pod_namespace=self.namespace
           ) as self.config_file:
               return super().execute(context)
   ```
   #### 
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html
   ```
           cluster_config = {
               "apiVersion": "v1",
               "kind": "Config",
               "clusters": [
                   {
                       "cluster": {"server": cluster_ep, 
"certificate-authority-data": cluster_cert},
                       "name": eks_cluster_name,
                   }
               ],
               "contexts": [
                   {
                       "context": {
                           "cluster": eks_cluster_name,
                           "namespace": pod_namespace,
                           "user": _POD_USERNAME,
                       },
                       "name": _CONTEXT_NAME,
                   }
               ],
               "current-context": _CONTEXT_NAME,
               "preferences": {},
               "users": [
                   {
                       "name": _POD_USERNAME,
                       "user": {
                           "exec": {
                               "apiVersion": AUTHENTICATION_API_VERSION,
                               "command": "sh",
                               "args": [
                                   "-c",
                                   COMMAND.format(
                                       python_executable=python_executable,
                                       eks_cluster_name=eks_cluster_name,
                                       args=args,
                                   ),
                               ],
                               "interactiveMode": "Never",
                           }
                       },
                   }
               ],
           }
   ```
   #### Here it is executing a bash command, searching the COMMAND variable you 
can see the exact command executed as follows, you can see it is getting the 
eks token.
   ```
   COMMAND = """
               output=$({python_executable} -m 
airflow.providers.amazon.aws.utils.eks_get_token \
                   --cluster-name {eks_cluster_name} {args} 2>&1)
   
               if [ $? -ne 0 ]; then
                   echo "Error running the script"
                   exit 1
               fi
   
               expiration_timestamp=$(echo "$output" | grep -oP 
'expirationTimestamp: \\K[^,]+')
               token=$(echo "$output" | grep -oP 'token: \\K[^,]+')
   
               json_string=$(printf '{{"kind": "ExecCredential","apiVersion": \
                   "client.authentication.k8s.io/v1alpha1","spec": 
{{}},"status": \
                   {{"expirationTimestamp": "%s","token": "%s"}}}}' 
"$expiration_timestamp" "$token")
               echo $json_string
               """
   ```
   #### 
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/utils/eks_get_token.html
   ```
   [docs]def main():
       parser = get_parser()
       args = parser.parse_args()
       eks_hook = EksHook(aws_conn_id=args.aws_conn_id, 
region_name=args.region_name)
       access_token = eks_hook.fetch_access_token_for_cluster(args.cluster_name)
       access_token_expiration = get_expiration_time()
       print(f"expirationTimestamp: {access_token_expiration}, token: 
{access_token}")
   ```
   #### access_token from 
eks_hook.fetch_access_token_for_cluster(args.cluster_name)  Check out the 
implementation of the eks_hook.fetch_access_token_for_cluster method: 
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html
   ```
   def fetch_access_token_for_cluster(self, eks_cluster_name: str) -> str:
           session = self.get_session()
           service_id = self.conn.meta.service_model.service_id
           sts_url = (
               
f"https://sts.{session.region_name}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15";
           )
   ```
   ### The address to access STS here points to the global address, not the 
China STS service address. So the eks token obtained cannot be used in China. 
The sts_url that should be used in China is f 
“https://sts.{session.region_name}.amazonaws.com.cn/?Action=GetCallerIdentity&Version=2011-06-15”
   
   ### How to reproduce
   
   You can easily reproduce this using the dag script above, provided you use 
the identity credentials of your China AWS account.
   
   ### Anything else
   
   no
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to