seanghaeli commented on code in PR #68922:
URL: https://github.com/apache/airflow/pull/68922#discussion_r3467824997


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -872,7 +872,39 @@ def __init__(
         self.deferrable = deferrable
         self.max_attempts = max_attempts
 
+    def _resume_if_paused(self) -> None:
+        """
+        Resume the cluster first if it is paused, so it can be deleted.
+
+        A ``paused`` Redshift cluster cannot be deleted -- ``delete_cluster`` 
raises
+        ``InvalidClusterStateFault`` ("There is an operation running on the 
Cluster") and no
+        amount of retrying helps, because a paused cluster never leaves that 
state on its own.
+        Left unhandled, the cluster is silently leaked (it stays paused 
indefinitely until
+        external cleanup reaps it). Resume it and wait until it is 
``available`` before deleting.
+        """
+        try:
+            cluster_state = 
self.hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        except self.hook.conn.exceptions.ClusterNotFoundFault:
+            return
+
+        if cluster_state != "paused":
+            return
+
+        self.log.info(
+            "Cluster %s is paused; resuming it before deletion (a paused 
cluster cannot be deleted).",
+            self.cluster_identifier,
+        )
+        
self.hook.conn.resume_cluster(ClusterIdentifier=self.cluster_identifier)
+        self.hook.conn.get_waiter("cluster_available").wait(
+            ClusterIdentifier=self.cluster_identifier,
+            WaiterConfig={"Delay": self.poll_interval, "MaxAttempts": 
self.max_attempts},
+        )
+
     def execute(self, context: Context):
+        # A paused cluster cannot be deleted; resume it first (otherwise the 
retry loop below
+        # would exhaust against InvalidClusterStateFault and the cluster would 
be leaked).
+        self._resume_if_paused()

Review Comment:
   Good catch — you're right that resume + delete are two separate AWS calls 
and can't be made transactional, so auto-resuming risks leaving the cluster 
running if the task dies between the two (the inverse surprise of the leak this 
started from).
   
   Made it opt-in via a new `resume_if_paused` flag (default `False`), so 
existing behavior is unchanged. The flag's docstring spells out the 
non-transactional tradeoff, so callers enable it only when leaving a paused 
cluster behind is the worse outcome (e.g. ephemeral test-cluster teardown). 
Pushed.
   
   Drafted-by: Claude Code (Opus via Claude Code) on behalf of Sean Ghaeli



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to