This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7329e9eed9 Fix for `EksCreateClusterOperator` deferrable mode (#36079)
7329e9eed9 is described below

commit 7329e9eed91cc34a03c275dfc9a0fd4c9a0bba6f
Author: Syed Hussain <103602455+syeda...@users.noreply.github.com>
AuthorDate: Thu Dec 7 12:47:12 2023 -0800

    Fix for `EksCreateClusterOperator` deferrable mode (#36079)
---
 airflow/providers/amazon/aws/operators/eks.py | 30 ++++++++++++++++++++-------
 airflow/providers/amazon/aws/triggers/eks.py  | 29 +++++++++++++++++++++++++-
 2 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/airflow/providers/amazon/aws/operators/eks.py 
b/airflow/providers/amazon/aws/operators/eks.py
index 8da9d7d8dd..e82a2a8811 100644
--- a/airflow/providers/amazon/aws/operators/eks.py
+++ b/airflow/providers/amazon/aws/operators/eks.py
@@ -21,6 +21,7 @@ import logging
 import warnings
 from ast import literal_eval
 from datetime import timedelta
+from functools import cached_property
 from typing import TYPE_CHECKING, Any, List, Sequence, cast
 
 from botocore.exceptions import ClientError, WaiterError
@@ -257,6 +258,20 @@ class EksCreateClusterOperator(BaseOperator):
             **kwargs,
         )
 
+    @cached_property
+    def hook(self) -> EksHook:
+        return EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region)
+
+    @property
+    def eks_hook(self):
+        warnings.warn(
+            "`eks_hook` property is deprecated and will be removed in the 
future. "
+            "Please use `hook` property instead.",
+            AirflowProviderDeprecationWarning,
+            stacklevel=2,
+        )
+        return self.hook
+
     def execute(self, context: Context):
         if self.compute:
             if self.compute not in SUPPORTED_COMPUTE_VALUES:
@@ -271,8 +286,7 @@ class EksCreateClusterOperator(BaseOperator):
                         compute=FARGATE_FULL_NAME, 
requirement="fargate_pod_execution_role_arn"
                     )
                 )
-        self.eks_hook = EksHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region)
-        self.eks_hook.create_cluster(
+        self.hook.create_cluster(
             name=self.cluster_name,
             roleArn=self.cluster_role_arn,
             resourcesVpcConfig=self.resources_vpc_config,
@@ -285,7 +299,7 @@ class EksCreateClusterOperator(BaseOperator):
             return None
 
         self.log.info("Waiting for EKS Cluster to provision. This will take 
some time.")
-        client = self.eks_hook.conn
+        client = self.hook.conn
 
         if self.deferrable:
             self.defer(
@@ -307,7 +321,7 @@ class EksCreateClusterOperator(BaseOperator):
             )
         except (ClientError, WaiterError) as e:
             self.log.error("Cluster failed to start and will be torn down.\n 
%s", e)
-            self.eks_hook.delete_cluster(name=self.cluster_name)
+            self.hook.delete_cluster(name=self.cluster_name)
             client.get_waiter("cluster_deleted").wait(
                 name=self.cluster_name,
                 WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": 
self.waiter_max_attempts},
@@ -337,7 +351,7 @@ class EksCreateClusterOperator(BaseOperator):
             raise AirflowException("Trigger error: event is None")
         elif event["status"] == "failed":
             self.log.error("Cluster failed to start and will be torn down.")
-            self.eks_hook.delete_cluster(name=self.cluster_name)
+            self.hook.delete_cluster(name=self.cluster_name)
             self.defer(
                 trigger=EksDeleteClusterTrigger(
                     cluster_name=self.cluster_name,
@@ -382,7 +396,7 @@ class EksCreateClusterOperator(BaseOperator):
                     method_name="execute_complete",
                     timeout=timedelta(seconds=self.waiter_max_attempts * 
self.waiter_delay),
                 )
-            else:
+            elif self.compute == "nodegroup":
                 self.defer(
                     trigger=EksCreateNodegroupTrigger(
                         nodegroup_name=self.nodegroup_name,
@@ -400,9 +414,9 @@ class EksCreateClusterOperator(BaseOperator):
         if event is None:
             self.log.info("Trigger error: event is None")
             raise AirflowException("Trigger error: event is None")
-        elif event["status"] == "delteted":
+        elif event["status"] == "deleted":
             self.log.info("Cluster deleted")
-            raise event["exception"]
+        raise AirflowException("Error creating cluster")
 
     def execute_complete(self, context: Context, event: dict[str, Any] | None 
= None) -> None:
         resource = "fargate profile" if self.compute == "fargate" else 
self.compute
diff --git a/airflow/providers/amazon/aws/triggers/eks.py 
b/airflow/providers/amazon/aws/triggers/eks.py
index eb496acff8..03cef61506 100644
--- a/airflow/providers/amazon/aws/triggers/eks.py
+++ b/airflow/providers/amazon/aws/triggers/eks.py
@@ -19,7 +19,9 @@ from __future__ import annotations
 import warnings
 from typing import TYPE_CHECKING, Any
 
-from airflow.exceptions import AirflowProviderDeprecationWarning
+from botocore.exceptions import ClientError
+
+from airflow.exceptions import AirflowException, 
AirflowProviderDeprecationWarning
 from airflow.providers.amazon.aws.hooks.eks import EksHook
 from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger
 from airflow.providers.amazon.aws.utils.waiter_with_logging import async_wait
@@ -68,6 +70,25 @@ class EksCreateClusterTrigger(AwsBaseWaiterTrigger):
     def hook(self) -> AwsGenericHook:
         return EksHook(aws_conn_id=self.aws_conn_id, 
region_name=self.region_name)
 
+    async def run(self):
+        async with self.hook().async_conn as client:
+            waiter = client.get_waiter(self.waiter_name)
+            try:
+                await async_wait(
+                    waiter,
+                    self.waiter_delay,
+                    self.attempts,
+                    self.waiter_args,
+                    self.failure_message,
+                    self.status_message,
+                    self.status_queries,
+                )
+            except AirflowException as exception:
+                self.log.error("Error creating cluster: %s", exception)
+                yield TriggerEvent({"status": "failed"})
+            else:
+                yield TriggerEvent({"status": "success"})
+
 
 class EksDeleteClusterTrigger(AwsBaseWaiterTrigger):
     """
@@ -125,7 +146,13 @@ class EksDeleteClusterTrigger(AwsBaseWaiterTrigger):
             if self.force_delete_compute:
                 await self.delete_any_nodegroups(client=client)
                 await self.delete_any_fargate_profiles(client=client)
+            try:
                 await client.delete_cluster(name=self.cluster_name)
+            except ClientError as ex:
+                if ex.response.get("Error").get("Code") == 
"ResourceNotFoundException":
+                    pass
+                else:
+                    raise
             await async_wait(
                 waiter=waiter,
                 waiter_delay=int(self.waiter_delay),

Reply via email to