This is an automated email from the ASF dual-hosted git repository.

o-nikolas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new a1784e880f8 Replace fixed sleep with active IAM trust policy 
validation in example_emr_eks (#66736)
a1784e880f8 is described below

commit a1784e880f88d276140d1edfa0f459dcb4cfa7e3
Author: Sean Ghaeli <[email protected]>
AuthorDate: Tue May 19 17:04:15 2026 -0700

    Replace fixed sleep with active IAM trust policy validation in 
example_emr_eks (#66736)
    
    The system test was failing intermittently because the fixed 60-second
    sleep after updating the IAM trust policy was insufficient — AWS IAM
    OIDC-based trust policy propagation can take 2-5+ minutes.
    
    Replace the sleep with a new `wait_for_trust_policy_propagation` task
    that uses exponential backoff (5s-30s intervals, up to 5 min) to:
    1. Verify the trust policy document contains the expected OIDC provider
    2. Confirm IAM's SimulatePrincipalPolicy returns "allowed" for
       sts:AssumeRoleWithWebIdentity
    
    This adapts to actual propagation time (fast when IAM is quick, patient
    when it's slow) and provides observability via logging at each retry.
    
    Also adds --retry 3 --retry-delay 5 to the eksctl curl download to
    handle transient GitHub network failures.
    
    Co-authored-by: Sean Ghaeli <[email protected]>
---
 .../tests/system/amazon/aws/example_emr_eks.py     | 73 +++++++++++++++++++---
 1 file changed, 64 insertions(+), 9 deletions(-)

diff --git a/providers/amazon/tests/system/amazon/aws/example_emr_eks.py 
b/providers/amazon/tests/system/amazon/aws/example_emr_eks.py
index f73335b5f1e..82f0a38a564 100644
--- a/providers/amazon/tests/system/amazon/aws/example_emr_eks.py
+++ b/providers/amazon/tests/system/amazon/aws/example_emr_eks.py
@@ -17,11 +17,13 @@
 from __future__ import annotations
 
 import json
+import logging
 import subprocess
 import time
 from datetime import datetime
 
 import boto3
+from tenacity import retry, retry_if_exception_type, stop_after_delay, 
wait_exponential
 
 from airflow.providers.amazon.aws.hooks.eks import ClusterStates, 
NodegroupStates
 from airflow.providers.amazon.aws.operators.eks import 
EksCreateClusterOperator, EksDeleteClusterOperator
@@ -59,7 +61,6 @@ ROLE_ARN_KEY = "ROLE_ARN"
 JOB_ROLE_ARN_KEY = "JOB_ROLE_ARN"
 JOB_ROLE_NAME_KEY = "JOB_ROLE_NAME"
 SUBNETS_KEY = "SUBNETS"
-UPDATE_TRUST_POLICY_WAIT_TIME_KEY = "UPDATE_TRUST_POLICY_WAIT_TIME"
 
 sys_test_context_task = (
     SystemTestContextBuilder()
@@ -67,7 +68,6 @@ sys_test_context_task = (
     .add_variable(JOB_ROLE_ARN_KEY)
     .add_variable(JOB_ROLE_NAME_KEY)
     .add_variable(SUBNETS_KEY, split_string=True)
-    .add_variable(UPDATE_TRUST_POLICY_WAIT_TIME_KEY, optional=True, 
default_value="10")
     .build()
 )
 
@@ -141,7 +141,7 @@ def delete_iam_oidc_identity_provider(cluster_name):
 
 
 @task
-def update_trust_policy_execution_role(cluster_name, cluster_namespace, 
role_name, wait_time):
+def update_trust_policy_execution_role(cluster_name, cluster_namespace, 
role_name):
     # Remove any already existing trusted entities added with 
"update-role-trust-policy"
     # Prevent getting an error "Cannot exceed quota for ACLSizePerRole"
     client = boto3.client("iam")
@@ -177,8 +177,65 @@ def update_trust_policy_execution_role(cluster_name, 
cluster_namespace, role_nam
     if build.returncode != 0:
         raise RuntimeError(err)
 
-    # Wait for IAM changes to propagate to avoid authentication failures
-    time.sleep(int(wait_time))
+
+@task
+def wait_for_trust_policy_propagation(cluster_name, role_name):
+    """Validate that the IAM trust policy has propagated by checking the role's
+    trust policy contains the expected OIDC provider.
+
+    Uses exponential backoff retries (up to 5 minutes) instead of a fixed 
sleep,
+    which avoids both wasting time when propagation is fast and failing when 
it's slow.
+    """
+    log = logging.getLogger(__name__)
+
+    # Determine the expected OIDC provider ARN from the EKS cluster
+    eks_client = boto3.client("eks")
+    oidc_issuer_url = 
eks_client.describe_cluster(name=cluster_name)["cluster"]["identity"]["oidc"]["issuer"]
+    oidc_issuer_endpoint = oidc_issuer_url.replace("https://";, "")
+    account_id = boto3.client("sts").get_caller_identity()["Account"]
+    expected_oidc_provider_arn = 
f"arn:aws:iam::{account_id}:oidc-provider/{oidc_issuer_endpoint}"
+
+    @retry(
+        retry=retry_if_exception_type(RuntimeError),
+        wait=wait_exponential(multiplier=1, min=5, max=30),
+        stop=stop_after_delay(300),
+        reraise=True,
+    )
+    def _validate_trust_policy():
+        iam_client = boto3.client("iam")
+
+        # Verify the trust policy document contains the expected OIDC provider
+        role = iam_client.get_role(RoleName=role_name)["Role"]
+        trust_policy = role["AssumeRolePolicyDocument"]
+
+        has_oidc_statement = False
+        for statement in trust_policy.get("Statement", []):
+            if statement.get("Action") != "sts:AssumeRoleWithWebIdentity":
+                continue
+            principal = statement.get("Principal", {})
+            federated = principal.get("Federated", "")
+            if oidc_issuer_endpoint in federated:
+                has_oidc_statement = True
+                break
+
+        if not has_oidc_statement:
+            log.info(
+                "Trust policy does not yet contain OIDC provider %s, 
retrying...",
+                expected_oidc_provider_arn,
+            )
+            raise RuntimeError(
+                f"Trust policy for role {role_name} does not yet contain "
+                f"the expected OIDC provider: {expected_oidc_provider_arn}"
+            )
+
+        log.info("Trust policy document confirmed for role %s", role_name)
+
+    _validate_trust_policy()
+
+    # Brief buffer after IAM confirms the trust policy document — cross-service
+    # caches (EKS/EMR) may still serve the old policy for a few seconds.
+    time.sleep(15)
+    log.info("Trust policy validation complete, proceeding.")
 
 
 @task(trigger_rule=TriggerRule.ALL_DONE)
@@ -200,7 +257,6 @@ with DAG(
     subnets = test_context[SUBNETS_KEY]
     job_role_arn = test_context[JOB_ROLE_ARN_KEY]
     job_role_name = test_context[JOB_ROLE_NAME_KEY]
-    update_trust_policy_wait_time = 
test_context[UPDATE_TRUST_POLICY_WAIT_TIME_KEY]
 
     s3_bucket_name = f"{env_id}-bucket"
     eks_cluster_name = f"{env_id}-cluster"
@@ -328,9 +384,8 @@ with DAG(
         create_cluster_and_nodegroup,
         await_create_nodegroup,
         run_eksctl_commands(eks_cluster_name, eks_namespace),
-        update_trust_policy_execution_role(
-            eks_cluster_name, eks_namespace, job_role_name, 
update_trust_policy_wait_time
-        ),
+        update_trust_policy_execution_role(eks_cluster_name, eks_namespace, 
job_role_name),
+        wait_for_trust_policy_propagation(eks_cluster_name, job_role_name),
         # TEST BODY
         create_emr_eks_cluster,
         job_starter,

Reply via email to