ramitkataria commented on code in PR #68922:
URL: https://github.com/apache/airflow/pull/68922#discussion_r3471000847


##########
providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -871,8 +874,52 @@ def __init__(
         self._attempt_interval = 15
         self.deferrable = deferrable
         self.max_attempts = max_attempts
+        self.resume_if_paused = resume_if_paused
+
+    def _resume_if_paused(self) -> None:
+        """
+        Resume the cluster if it is paused.
+
+        A paused Redshift cluster cannot be deleted. If the cluster is 
currently paused, resume it
+        and wait until it reaches the ``available`` state before continuing.
+        """
+        # Gated behind the opt-in ``resume_if_paused`` flag: resume and delete 
are two separate,
+        # non-transactional AWS calls, so a failure between them would leave 
the cluster running.
+        try:
+            cluster_state = 
self.hook.cluster_status(cluster_identifier=self.cluster_identifier)
+        except self.hook.conn.exceptions.ClusterNotFoundFault:
+            self.log.info(
+                "Cluster %s not found while checking whether resume is 
required.",
+                self.cluster_identifier,
+            )
+            return
+
+        if cluster_state != "paused":
+            self.log.info(
+                "Cluster %s is in state %s; skipping resume.",
+                self.cluster_identifier,
+                cluster_state,
+            )
+            return
+
+        self.log.info(
+            "Cluster %s is paused; resuming it before deletion (a paused 
cluster cannot be deleted).",
+            self.cluster_identifier,
+        )
+        
self.hook.conn.resume_cluster(ClusterIdentifier=self.cluster_identifier)
+        self.hook.conn.get_waiter("cluster_available").wait(
+            ClusterIdentifier=self.cluster_identifier,
+            WaiterConfig={"Delay": self.poll_interval, "MaxAttempts": 
self.max_attempts},
+        )
 
     def execute(self, context: Context):
+        # A paused cluster cannot be deleted; optionally resume it first 
(otherwise the retry loop
+        # below would exhaust against InvalidClusterStateFault and the cluster 
would be leaked).
+        # Opt-in (resume_if_paused) because resume+delete is not transactional 
-- see
+        # _resume_if_paused.
+        if self.resume_if_paused:
+            self._resume_if_paused()

Review Comment:
   Wouldn't this block a worker for up to 15 mins, defeating the purpose of 
deferrable mode?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to