This is an automated email from the ASF dual-hosted git repository.

vincbeck pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0027d171d90 Add bounded retry cleanup for Redshift cluster deletion 
after post-creation failure. (#63074)
0027d171d90 is described below

commit 0027d171d908908692f16755a77bc2e4dea42a25
Author: SameerMesiah97 <[email protected]>
AuthorDate: Wed Mar 11 14:15:57 2026 +0000

    Add bounded retry cleanup for Redshift cluster deletion after post-creation 
failure. (#63074)
    
    Retry deletion when `InvalidClusterState` or `InvalidClusterStateFault` 
indicates the cluster is still processing another operation, and bound retries 
using `cleanup_timeout_seconds` (default 300s) to avoid indefinite worker 
occupation.
    
    Introduce `_attempt_cleanup_with_retry` helper and update existing cleanup 
logic so the original exception is always re-raised while cleanup failures are 
logged. Update existing cleanup tests to account for retry behavior and add a 
new test covering retry on active cluster operations.
---
 .../amazon/aws/operators/redshift_cluster.py       | 73 ++++++++++++++++++++--
 .../amazon/aws/operators/test_redshift_cluster.py  | 66 ++++++++++++++++++-
 2 files changed, 130 insertions(+), 9 deletions(-)

diff --git 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
index 47f4277a600..c050987df1d 100644
--- 
a/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
+++ 
b/providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py
@@ -21,7 +21,8 @@ from collections.abc import Sequence
 from datetime import timedelta
 from typing import TYPE_CHECKING, Any
 
-from botocore.exceptions import WaiterError
+from botocore.exceptions import ClientError, WaiterError
+from tenacity import Retrying, retry_if_exception, stop_after_delay, wait_fixed
 
 from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook
 from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
@@ -110,6 +111,9 @@ class 
RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
     :param deferrable: If True, the operator will run in deferrable mode.
     :param delete_cluster_on_failure: If True, best-effort deletion of the 
redshift cluster will be attempted
         after post-creation failure. Default: True.
+    :param cleanup_timeout_seconds: Maximum time in seconds to attempt
+        best-effort deletion of the cluster when post-creation failure occurs.
+        Default: 300 seconds.
     """
 
     template_fields: Sequence[str] = aws_template_fields(
@@ -193,6 +197,7 @@ class 
RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
         poll_interval: int = 60,
         deferrable: bool = conf.getboolean("operators", "default_deferrable", 
fallback=False),
         delete_cluster_on_failure: bool = True,
+        cleanup_timeout_seconds: int = 300,
         **kwargs,
     ):
         super().__init__(**kwargs)
@@ -235,6 +240,65 @@ class 
RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
         self.deferrable = deferrable
         self.kwargs = kwargs
         self.delete_cluster_on_failure = delete_cluster_on_failure
+        self.cleanup_timeout_seconds = cleanup_timeout_seconds
+
+    @staticmethod
+    def _retry_if_cluster_busy(exc: BaseException) -> bool:
+        if isinstance(exc, ClientError):
+            return exc.response["Error"]["Code"] in {
+                "InvalidClusterStateFault",
+                "InvalidClusterState",
+            }
+        return False
+
+    def _attempt_cleanup_with_retry(self) -> None:
+        """
+        Attempt bounded best-effort deletion of the cluster.
+
+        This method is only invoked during task failure handling.
+        It does not block until deletion completes and will not
+        mask the original exception.
+        """
+        RETRY_INTERVAL_SECONDS = 60
+
+        retrying = Retrying(
+            retry=retry_if_exception(self._retry_if_cluster_busy),
+            wait=wait_fixed(RETRY_INTERVAL_SECONDS),
+            stop=stop_after_delay(self.cleanup_timeout_seconds),
+            reraise=True,
+        )
+
+        try:
+            for attempt in retrying:
+                with attempt:
+                    self.log.info(
+                        "Attempt %s: Deleting Redshift cluster %s.",
+                        attempt.retry_state.attempt_number,
+                        self.cluster_identifier,
+                    )
+
+                    # Do not wait for deletion to complete; cleanup is 
best-effort.
+                    
self.hook.delete_cluster(cluster_identifier=self.cluster_identifier)
+
+                    self.log.info(
+                        "Successfully initiated deletion of Redshift cluster 
%s.",
+                        self.cluster_identifier,
+                    )
+
+                    return
+
+        except Exception as e:
+            if self._retry_if_cluster_busy(e):
+                self.log.exception(
+                    "Timed out after %s seconds while trying to delete 
Redshift cluster %s.",
+                    self.cleanup_timeout_seconds,
+                    self.cluster_identifier,
+                )
+            else:
+                self.log.exception(
+                    "Unexpected error while attempting to delete Redshift 
cluster %s.",
+                    self.cluster_identifier,
+                )
 
     def execute(self, context: Context):
         self.log.info("Creating Redshift cluster %s", self.cluster_identifier)
@@ -340,13 +404,10 @@ class 
RedshiftCreateClusterOperator(AwsBaseOperator[RedshiftHook]):
 
                 if self.delete_cluster_on_failure:
                     try:
-                        self.log.warning(
-                            "Attempting deletion of Redshift cluster %s.", 
self.cluster_identifier
-                        )
-                        
self.hook.delete_cluster(cluster_identifier=self.cluster_identifier)
+                        self._attempt_cleanup_with_retry()
                     except Exception:
                         self.log.exception(
-                            "Failed while attempting to delete Reshift cluster 
%s.",
+                            "Failed while attempting to delete Redshift 
cluster %s.",
                             self.cluster_identifier,
                         )
             raise
diff --git 
a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py 
b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py
index d8dfe1841b2..59943cfed29 100644
--- a/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py
+++ b/providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py
@@ -176,6 +176,7 @@ class TestRedshiftCreateClusterOperator:
             cluster_type="single-node",
             wait_for_completion=True,
             delete_cluster_on_failure=True,
+            cleanup_timeout_seconds=300,
         )
 
         with pytest.raises(WaiterError):
@@ -184,10 +185,11 @@ class TestRedshiftCreateClusterOperator:
         # Cluster creation happened.
         mock_conn.create_cluster.assert_called_once()
 
-        # Cleanup attempted.
-        mock_delete_cluster.assert_called_once_with(
+        # Cleanup attempted at least once.
+        mock_delete_cluster.assert_called_with(
             cluster_identifier="test-cluster",
         )
+        assert mock_delete_cluster.call_count >= 1
 
     @mock.patch.object(RedshiftHook, "delete_cluster")
     @mock.patch.object(RedshiftHook, "conn")
@@ -226,6 +228,7 @@ class TestRedshiftCreateClusterOperator:
             cluster_type="single-node",
             wait_for_completion=True,
             delete_cluster_on_failure=True,
+            cleanup_timeout_seconds=300,
         )
 
         with pytest.raises(WaiterError) as exc:
@@ -238,10 +241,67 @@ class TestRedshiftCreateClusterOperator:
         mock_conn.create_cluster.assert_called_once()
 
         # Cleanup attempted despite failure.
-        mock_delete_cluster.assert_called_once_with(
+        mock_delete_cluster.assert_called_with(
             cluster_identifier="test-cluster",
         )
 
+        # Cleanup attempted despite failure.
+        mock_delete_cluster.assert_called_with(
+            cluster_identifier="test-cluster",
+        )
+        assert mock_delete_cluster.call_count >= 1
+
+    @mock.patch("tenacity.nap.time.sleep", mock.MagicMock())
+    @mock.patch.object(RedshiftHook, "delete_cluster")
+    @mock.patch.object(RedshiftHook, "conn")
+    def test_create_cluster_cleanup_retries_on_active_operation(
+        self,
+        mock_conn,
+        mock_delete_cluster,
+    ):
+        # Simulate waiter failure (e.g. DescribeClusters denied).
+        waiter_error = WaiterError(
+            name="ClusterAvailable",
+            reason="AccessDenied for DescribeClusters",
+            last_response={},
+        )
+        mock_conn.get_waiter.return_value.wait.side_effect = waiter_error
+
+        # First deletion attempt fails due to cluster still modifying.
+        active_operation_error = ClientError(
+            error_response={
+                "Error": {
+                    "Code": "InvalidClusterStateFault",
+                    "Message": "Cluster currently modifying",
+                }
+            },
+            operation_name="DeleteCluster",
+        )
+
+        # Second attempt succeeds.
+        mock_delete_cluster.side_effect = [
+            active_operation_error,
+            None,
+        ]
+
+        operator = RedshiftCreateClusterOperator(
+            task_id="task_test",
+            cluster_identifier="test-cluster",
+            node_type="ra3.large",
+            master_username="adminuser",
+            master_user_password="Test123$",
+            cluster_type="single-node",
+            wait_for_completion=True,
+            delete_cluster_on_failure=True,
+            cleanup_timeout_seconds=300,
+        )
+
+        with pytest.raises(WaiterError):
+            operator.execute({})
+
+        # Retry should occur.
+        assert mock_delete_cluster.call_count == 2
+
 
 class TestRedshiftCreateClusterSnapshotOperator:
     @mock.patch.object(RedshiftHook, "cluster_status")

Reply via email to