Lee-W commented on code in PR #36416: URL: https://github.com/apache/airflow/pull/36416#discussion_r1437323931
########## airflow/providers/amazon/aws/operators/redshift_cluster.py: ########## @@ -624,25 +637,36 @@ def execute(self, context: Context): else: raise error if self.deferrable: - self.defer( - trigger=RedshiftPauseClusterTrigger( - cluster_identifier=self.cluster_identifier, - waiter_delay=self.poll_interval, - waiter_max_attempts=self.max_attempts, - aws_conn_id=self.aws_conn_id, - ), - method_name="execute_complete", - # timeout is set to ensure that if a trigger dies, the timeout does not restart - # 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent) - timeout=timedelta(seconds=self.max_attempts * self.poll_interval + 60), - ) + cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) + if cluster_state == "paused": + self.log.info("Paused cluster successfully") + elif cluster_state == "deleting": + raise AirflowException( + f"Unable to pause cluster since cluster is currently in status: {cluster_state}" + ) + else: Review Comment: same above ########## airflow/providers/amazon/aws/operators/redshift_cluster.py: ########## @@ -714,26 +738,41 @@ def execute(self, context: Context): time.sleep(self._attempt_interval) else: raise + if self.deferrable: - self.defer( - timeout=timedelta(seconds=self.max_attempts * self.poll_interval + 60), - trigger=RedshiftDeleteClusterTrigger( - cluster_identifier=self.cluster_identifier, - waiter_delay=self.poll_interval, - waiter_max_attempts=self.max_attempts, - aws_conn_id=self.aws_conn_id, - ), - method_name="execute_complete", - ) + cluster_state = self.redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) + if cluster_state == "cluster_not_found": + self.log.info("Cluster deleted successfully") + elif cluster_state in ("creating", "modifying"): + raise AirflowException( + f"Unable to delete cluster since cluster is currently in status: {cluster_state}" + ) + else: Review Comment: same above ########## airflow/providers/amazon/aws/operators/redshift_cluster.py: ########## @@ -535,19 +535,29 @@ def execute(self, context: Context): time.sleep(self._attempt_interval) else: raise error + if self.deferrable: - self.defer( - trigger=RedshiftResumeClusterTrigger( - cluster_identifier=self.cluster_identifier, - waiter_delay=self.poll_interval, - waiter_max_attempts=self.max_attempts, - aws_conn_id=self.aws_conn_id, - ), - method_name="execute_complete", - # timeout is set to ensure that if a trigger dies, the timeout does not restart - # 60 seconds is added to allow the trigger to exit gracefully (i.e. yield TriggerEvent) - timeout=timedelta(seconds=self.max_attempts * self.poll_interval + 60), - ) + cluster_state = redshift_hook.cluster_status(cluster_identifier=self.cluster_identifier) + if cluster_state == "available": + self.log.info("Resumed cluster successfully") + elif cluster_state == "deleting": + raise AirflowException( + "Unable to resume cluster since cluster is currently in status: %s", cluster_state + ) + else: Review Comment: I think that's something done in https://github.com/apache/airflow/blob/d1e50b48bf57c2c3cffd14cc92ce1f0ecf284a03/airflow/providers/amazon/aws/operators/redshift_cluster.py#L526. What we should do here is not to check whether it's **resumable**, but whether it's finished and that's basically what we do in the waiter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org