ferruzzi commented on code in PR #66736:
URL: https://github.com/apache/airflow/pull/66736#discussion_r3236527205


##########
providers/amazon/tests/system/amazon/aws/example_emr_eks.py:
##########
@@ -177,8 +176,71 @@ def update_trust_policy_execution_role(cluster_name, 
cluster_namespace, role_nam
     if build.returncode != 0:
         raise RuntimeError(err)
 
-    # Wait for IAM changes to propagate to avoid authentication failures
-    time.sleep(int(wait_time))
+
+class TrustPolicyNotPropagatedError(Exception):
+    """Raised when the IAM trust policy has not yet propagated."""
+
+
+@task
+def wait_for_trust_policy_propagation(cluster_name, role_name):
+    """Validate that the IAM trust policy has propagated by checking the role's
+    trust policy contains the expected OIDC provider.
+
+    Uses exponential backoff retries (up to 5 minutes) instead of a fixed 
sleep,
+    which avoids both wasting time when propagation is fast and failing when 
it's slow.
+    """
+    log = logging.getLogger(__name__)
+
+    # Determine the expected OIDC provider ARN from the EKS cluster
+    eks_client = boto3.client("eks")
+    oidc_issuer_url = 
eks_client.describe_cluster(name=cluster_name)["cluster"]["identity"]["oidc"]["issuer"]
+    oidc_issuer_endpoint = oidc_issuer_url.replace("https://";, "")
+    account_id = boto3.client("sts").get_caller_identity()["Account"]
+    expected_oidc_provider_arn = 
f"arn:aws:iam::{account_id}:oidc-provider/{oidc_issuer_endpoint}"
+
+    @retry(
+        retry=retry_if_exception_type(TrustPolicyNotPropagatedError),
+        wait=wait_exponential(multiplier=1, min=5, max=30),
+        stop=stop_after_delay(300),
+        reraise=True,
+    )
+    def _validate_trust_policy():
+        iam_client = boto3.client("iam")
+
+        # Step 1: Verify the trust policy document contains the expected OIDC 
provider
+        role = iam_client.get_role(RoleName=role_name)["Role"]
+        trust_policy = role["AssumeRolePolicyDocument"]
+
+        has_oidc_statement = False
+        for statement in trust_policy.get("Statement", []):
+            if statement.get("Action") != "sts:AssumeRoleWithWebIdentity":
+                continue
+            principal = statement.get("Principal", {})
+            federated = principal.get("Federated", "")
+            if oidc_issuer_endpoint in federated:
+                has_oidc_statement = True
+                break
+
+        if not has_oidc_statement:
+            log.info(
+                "Trust policy does not yet contain OIDC provider %s, 
retrying...",
+                expected_oidc_provider_arn,
+            )
+            raise TrustPolicyNotPropagatedError(
+                f"Trust policy for role {role_name} does not yet contain "
+                f"the expected OIDC provider: {expected_oidc_provider_arn}"
+            )
+
+        log.info("Trust policy document confirmed for role %s", role_name)
+
+    _validate_trust_policy()
+
+    # Brief buffer after IAM confirms the trust policy document — cross-service
+    # caches (EKS/EMR) may still serve the old policy for a few seconds.
+    import time

Review Comment:
   Move the import back to the top??



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to