This is an automated email from the ASF dual-hosted git repository. vincbeck pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 7329e9eed9 Fix for `EksCreateClusterOperator` deferrable mode (#36079) 7329e9eed9 is described below commit 7329e9eed91cc34a03c275dfc9a0fd4c9a0bba6f Author: Syed Hussain <103602455+syeda...@users.noreply.github.com> AuthorDate: Thu Dec 7 12:47:12 2023 -0800 Fix for `EksCreateClusterOperator` deferrable mode (#36079) --- airflow/providers/amazon/aws/operators/eks.py | 30 ++++++++++++++++++++------- airflow/providers/amazon/aws/triggers/eks.py | 29 +++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/eks.py b/airflow/providers/amazon/aws/operators/eks.py index 8da9d7d8dd..e82a2a8811 100644 --- a/airflow/providers/amazon/aws/operators/eks.py +++ b/airflow/providers/amazon/aws/operators/eks.py @@ -21,6 +21,7 @@ import logging import warnings from ast import literal_eval from datetime import timedelta +from functools import cached_property from typing import TYPE_CHECKING, Any, List, Sequence, cast from botocore.exceptions import ClientError, WaiterError @@ -257,6 +258,20 @@ class EksCreateClusterOperator(BaseOperator): **kwargs, ) + @cached_property + def hook(self) -> EksHook: + return EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region) + + @property + def eks_hook(self): + warnings.warn( + "`eks_hook` property is deprecated and will be removed in the future. " + "Please use `hook` property instead.", + AirflowProviderDeprecationWarning, + stacklevel=2, + ) + return self.hook + def execute(self, context: Context): if self.compute: if self.compute not in SUPPORTED_COMPUTE_VALUES: @@ -271,8 +286,7 @@ class EksCreateClusterOperator(BaseOperator): compute=FARGATE_FULL_NAME, requirement="fargate_pod_execution_role_arn" ) ) - self.eks_hook = EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region) - self.eks_hook.create_cluster( + self.hook.create_cluster( name=self.cluster_name, roleArn=self.cluster_role_arn, resourcesVpcConfig=self.resources_vpc_config, @@ -285,7 +299,7 @@ class EksCreateClusterOperator(BaseOperator): return None self.log.info("Waiting for EKS Cluster to provision. This will take some time.") - client = self.eks_hook.conn + client = self.hook.conn if self.deferrable: self.defer( @@ -307,7 +321,7 @@ class EksCreateClusterOperator(BaseOperator): ) except (ClientError, WaiterError) as e: self.log.error("Cluster failed to start and will be torn down.\n %s", e) - self.eks_hook.delete_cluster(name=self.cluster_name) + self.hook.delete_cluster(name=self.cluster_name) client.get_waiter("cluster_deleted").wait( name=self.cluster_name, WaiterConfig={"Delay": self.waiter_delay, "MaxAttempts": self.waiter_max_attempts}, @@ -337,7 +351,7 @@ class EksCreateClusterOperator(BaseOperator): raise AirflowException("Trigger error: event is None") elif event["status"] == "failed": self.log.error("Cluster failed to start and will be torn down.") - self.eks_hook.delete_cluster(name=self.cluster_name) + self.hook.delete_cluster(name=self.cluster_name) self.defer( trigger=EksDeleteClusterTrigger( cluster_name=self.cluster_name, @@ -382,7 +396,7 @@ class EksCreateClusterOperator(BaseOperator): method_name="execute_complete", timeout=timedelta(seconds=self.waiter_max_attempts * self.waiter_delay), ) - else: + elif self.compute == "nodegroup": self.defer( trigger=EksCreateNodegroupTrigger( nodegroup_name=self.nodegroup_name, @@ -400,9 +414,9 @@ class EksCreateClusterOperator(BaseOperator): if event is None: self.log.info("Trigger error: event is None") raise AirflowException("Trigger error: event is None") - elif event["status"] == "delteted": + elif event["status"] == "deleted": self.log.info("Cluster deleted") - raise event["exception"] + raise AirflowException("Error creating cluster") def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None: resource = "fargate profile" if self.compute == "fargate" else self.compute diff --git a/airflow/providers/amazon/aws/triggers/eks.py b/airflow/providers/amazon/aws/triggers/eks.py index eb496acff8..03cef61506 100644 --- a/airflow/providers/amazon/aws/triggers/eks.py +++ b/airflow/providers/amazon/aws/triggers/eks.py @@ -19,7 +19,9 @@ from __future__ import annotations import warnings from typing import TYPE_CHECKING, Any -from airflow.exceptions import AirflowProviderDeprecationWarning +from botocore.exceptions import ClientError + +from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning from airflow.providers.amazon.aws.hooks.eks import EksHook from airflow.providers.amazon.aws.triggers.base import AwsBaseWaiterTrigger from airflow.providers.amazon.aws.utils.waiter_with_logging import async_wait @@ -68,6 +70,25 @@ class EksCreateClusterTrigger(AwsBaseWaiterTrigger): def hook(self) -> AwsGenericHook: return EksHook(aws_conn_id=self.aws_conn_id, region_name=self.region_name) + async def run(self): + async with self.hook().async_conn as client: + waiter = client.get_waiter(self.waiter_name) + try: + await async_wait( + waiter, + self.waiter_delay, + self.attempts, + self.waiter_args, + self.failure_message, + self.status_message, + self.status_queries, + ) + except AirflowException as exception: + self.log.error("Error creating cluster: %s", exception) + yield TriggerEvent({"status": "failed"}) + else: + yield TriggerEvent({"status": "success"}) + class EksDeleteClusterTrigger(AwsBaseWaiterTrigger): """ @@ -125,7 +146,13 @@ class EksDeleteClusterTrigger(AwsBaseWaiterTrigger): if self.force_delete_compute: await self.delete_any_nodegroups(client=client) await self.delete_any_fargate_profiles(client=client) + try: await client.delete_cluster(name=self.cluster_name) + except ClientError as ex: + if ex.response.get("Error").get("Code") == "ResourceNotFoundException": + pass + else: + raise await async_wait( waiter=waiter, waiter_delay=int(self.waiter_delay),