shamallah opened a new issue, #64657:
URL: https://github.com/apache/airflow/issues/64657

   ### Apache Airflow Provider(s)
   
   amazon
   
   ### Versions of Apache Airflow Providers
   
   apache-airflow-providers-amazon (installed with MWAA 3.0.6 / Airflow 3.x)
   
   ### Apache Airflow version
   
   3.0.6 (via Amazon MWAA)
   
   ### Operating System
   
   Amazon MWAA (Amazon Linux 2 / Fargate)
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   Amazon (MWAA)
   
   ### What happened
   
   The `EksPodOperator` fails with a 401 Unauthorized error when authenticating 
to an EKS cluster using cross-account AssumeRole credentials configured via an 
Airflow AWS connection (`aws_conn_id`).
   
   The operator correctly uses the assumed role credentials for the 
`describe_cluster` API call (which succeeds), but the bearer token generated by 
the kubeconfig exec plugin does not carry the assumed role identity. The EKS 
API server audit logs show `"user":{}` — an empty user object — indicating the 
token is either malformed or was signed with invalid/no credentials.
   
   Same-account usage (no `aws_conn_id`, using the default MWAA execution role) 
works correctly. The issue is specific to the cross-account credential 
passthrough in the kubeconfig exec plugin mechanism.
   
   ### What you think should happen instead
   
   The `EksPodOperator` should successfully authenticate to the EKS cluster 
using the assumed role credentials from the `aws_conn_id` connection, both for 
same-account and cross-account scenarios.
   
   ### How to reproduce
   
   ### Setup
   
   - MWAA environment in Account A
   - EKS cluster in Account B
   - IAM role in Account B (`cross-account-role`) with:
     - Trust policy allowing the MWAA execution role from Account A to assume it
     - `eks:DescribeCluster` permission on the EKS cluster
   - EKS access entry for `cross-account-role` with `AmazonEKSEditPolicy` 
scoped to target namespace
   - Airflow AWS connection (`cluster_connection`) with Extra:
     ```json
     {
       "role_arn": "arn:aws:iam::<ACCOUNT_B>:role/cross-account-role",
       "region_name": "eu-west-1"
     }
     ```
   
   ### Verification that credentials work
   
   The following DAG confirms the Airflow connection correctly assumes the 
cross-account role:
   
   ```python
   from airflow import DAG
   from airflow.operators.python import PythonOperator
   from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
   from datetime import datetime
   
   def verify_identity():
       hook = AwsBaseHook(aws_conn_id="cluster_connection", client_type="sts")
       client = hook.get_conn()
       identity = client.get_caller_identity()
       print(f"Account: {identity['Account']}, ARN: {identity['Arn']}")
   
   with DAG(dag_id="verify_identity", start_date=datetime(2026, 1, 1), 
schedule=None, catchup=False) as dag:
       PythonOperator(task_id="verify", python_callable=verify_identity)
   ```
   
   Output confirms the assumed role identity in Account B.
   
   Additionally, generating an EKS token via subprocess using the same assumed 
role credentials succeeds:
   
   ```python
   # Using frozen credentials from 
EksHook(aws_conn_id="cluster_connection").get_session()
   # passed as env vars to: aws eks get-token --cluster-name <cluster> --region 
<region>
   # Result: valid k8s-aws-v1.* token, exit code 0
   ```
   
   ### Failing DAG
   
   ```python
   from airflow import DAG
   from airflow.providers.amazon.aws.operators.eks import EksPodOperator
   from datetime import datetime
   
   with DAG(dag_id="cross_account_eks_test", start_date=datetime(2026, 1, 1), 
schedule=None, catchup=False) as dag:
       run_pod = EksPodOperator(
           task_id="eks_pod_test",
           cluster_name="<CLUSTER_NAME>",
           region="eu-west-1",
           aws_conn_id="cluster_connection",
           namespace="<NAMESPACE>",
           image="python:3.9-slim",
           pod_name="cross-account-test",
           cmds=["python", "-c", "print('test')"],
           get_logs=True,
           is_delete_operator_pod=True,
       )
   ```
   
   ### Error
   
   ```
   ApiException: (401)
   Reason: Unauthorized
   HTTP response body: 
{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}
   ```
   
   ### EKS Audit Log
   
   ```json
   {
     "kind": "Event",
     "apiVersion": "audit.k8s.io/v1",
     "level": "Request",
     "requestURI": "/api/v1/namespaces/<ns>/pods?labelSelector=...",
     "verb": "list",
     "user": {},
     "responseStatus": {
       "status": "Failure",
       "message": "Unauthorized",
       "reason": "Unauthorized",
       "code": 401
     }
   }
   ```
   
   The critical detail is `"user":{}` — the EKS API server could not extract 
any identity from the bearer token.
   
   ### Anything else
   
   The `EksPodOperator.execute()` method correctly retrieves the assumed role 
credentials:
   
   [`EksPodOperator.execute()` — 
operators/eks.py#L1122](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/operators/eks.py#L1122)
   
   ```python
   eks_hook = EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region)
   session = eks_hook.get_session()
   credentials = session.get_credentials().get_frozen_credentials()
   These credentials are written to a temporary file via 
[`_secure_credential_context()` — 
hooks/eks.py#L547](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py#L547):
   
   These credentials are written to a temporary file via 
`_secure_credential_context()`:
   
   ```
   export AWS_ACCESS_KEY_ID='ASIA...'
   export AWS_SECRET_ACCESS_KEY='...'
   export AWS_SESSION_TOKEN='...'
   ```
   
   The kubeconfig exec plugin then runs the [`COMMAND` shell 
template](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py#L82-L116)
 which:
   1. Sources the credentials file (`. {credentials_file}`)
   2. Calls `python3.x -m airflow.providers.amazon.aws.utils.eks_get_token` 
with `2>&1`
   3. Parses the output using bash string manipulation to extract the token
   The [`eks_get_token` module — 
utils/eks_get_token.py#L52](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/utils/eks_get_token.py#L52)
 creates a plain `boto3.Session(region_name=region_name)` which should pick up 
the env vars from step 1. However, the `COMMAND` template has several fragility 
points:
   
   The `eks_get_token` module creates a plain 
`boto3.Session(region_name=region_name)` which should pick up the env vars from 
step 1. However, the `COMMAND` template has several fragility points:
   
   1. `export PYTHON_OPERATORS_VIRTUAL_ENV_MODE=1` is set before sourcing 
credentials, which may alter the Python execution environment on MWAA
   2. `2>&1` merges stderr into stdout — any Python warnings, deprecation 
notices, or MWAA-specific log output would be captured in `$output`
   3. The bash parsing (`last_line=${output##*$'\n'}`) extracts the last line, 
but if there are any extra lines of output (warnings, etc.), the parsing could 
extract the wrong content
   4. If the credentials file sourcing fails silently (e.g., file already 
cleaned up by a race condition), `eks_get_token` runs with no credentials and 
produces an unsigned/invalid token
   
   The kubeconfig is generated by [`generate_config_file()` — 
hooks/eks.py#L595](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py#L595),
 which embeds the `COMMAND` as a kubeconfig exec plugin using 
[`AUTHENTICATION_API_VERSION = "client.authentication.k8s.io/v1"` — 
hooks/eks.py#L39](https://github.com/apache/airflow/blob/main/providers/amazon/src/airflow/providers/amazon/aws/hooks/eks.py#L39).
   
   This does NOT affect same-account usage because in that case the default 
MWAA execution role credentials are already in the environment and 
`eks_get_token` picks them up regardless of whether the credentials file 
sourcing succeeds.
   
   This does NOT affect same-account usage because in that case the default 
MWAA execution role credentials are already in the environment and 
`eks_get_token` picks them up regardless of whether the credentials file 
sourcing succeeds.
   
   ## Workaround
   
   Currently, the only working approach for cross-account MWAA-to-EKS 
authentication is to generate the kubeconfig manually in a preceding task using 
`KubernetesPodOperator` with `config_file`, bypassing `EksPodOperator` entirely.
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [x] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to