SameerMesiah97 commented on code in PR #68922:
URL: https://github.com/apache/airflow/pull/68922#discussion_r3468490102
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -871,8 +878,50 @@ def __init__(
self._attempt_interval = 15
self.deferrable = deferrable
self.max_attempts = max_attempts
+ self.resume_if_paused = resume_if_paused
+
+ def _resume_if_paused(self) -> None:
+ """
+ Resume the cluster first if it is paused, so it can be deleted.
+
+ A ``paused`` Redshift cluster cannot be deleted -- ``delete_cluster``
raises
+ ``InvalidClusterStateFault`` ("There is an operation running on the
Cluster") and no
+ amount of retrying helps, because a paused cluster never leaves that
state on its own.
+ Left unhandled, the cluster is silently leaked (it stays paused
indefinitely until
+ external cleanup reaps it). Resume it and wait until it is
``available`` before deleting.
+
+ Gated behind ``resume_if_paused`` (opt-in). Resume and delete are two
separate AWS calls
+ and cannot be made transactional: if the task fails after the resume
succeeds but before
+ the delete does, the cluster is left ``available`` (running) rather
than paused -- the
+ inverse surprise. Callers opt in only when leaving a paused cluster
behind is the worse
+ outcome (e.g. teardown of an ephemeral test cluster).
+ """
Review Comment:
This docstring is too verbose. I think it should be focused on what the
function does and perhaps a few brief sentences on its behavior. I would
suggest the below instead:
```
"""
Resume the cluster if it is paused.
A paused Redshift cluster cannot be deleted. If the cluster is currently
paused, resume it and wait until it reaches the ``available`` state before
continuing.
"""
```
If you want to document the context behind introducing this new function, I
would add in a comment (but I think you have already done this well enough in
the `execute` method.
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -871,8 +878,50 @@ def __init__(
self._attempt_interval = 15
self.deferrable = deferrable
self.max_attempts = max_attempts
+ self.resume_if_paused = resume_if_paused
+
+ def _resume_if_paused(self) -> None:
+ """
+ Resume the cluster first if it is paused, so it can be deleted.
+
+ A ``paused`` Redshift cluster cannot be deleted -- ``delete_cluster``
raises
+ ``InvalidClusterStateFault`` ("There is an operation running on the
Cluster") and no
+ amount of retrying helps, because a paused cluster never leaves that
state on its own.
+ Left unhandled, the cluster is silently leaked (it stays paused
indefinitely until
+ external cleanup reaps it). Resume it and wait until it is
``available`` before deleting.
+
+ Gated behind ``resume_if_paused`` (opt-in). Resume and delete are two
separate AWS calls
+ and cannot be made transactional: if the task fails after the resume
succeeds but before
+ the delete does, the cluster is left ``available`` (running) rather
than paused -- the
+ inverse surprise. Callers opt in only when leaving a paused cluster
behind is the worse
+ outcome (e.g. teardown of an ephemeral test cluster).
+ """
+ try:
+ cluster_state =
self.hook.cluster_status(cluster_identifier=self.cluster_identifier)
+ except self.hook.conn.exceptions.ClusterNotFoundFault:
+ return
Review Comment:
Not sure if a silent return is a good idea. Maybe you could add this log
message before the `return`:
```
self.log.info(
"Cluster %s not found while checking whether resume is required.",
self.cluster_identifier,
)
```
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -837,6 +837,12 @@ class
RedshiftDeleteClusterOperator(AwsBaseOperator[RedshiftHook]):
:param poll_interval: Time (in seconds) to wait between two consecutive
calls to check cluster state
:param deferrable: Run operator in the deferrable mode.
:param max_attempts: (Deferrable mode only) The maximum number of attempts
to be made
+ :param resume_if_paused: If True, a ``paused`` cluster is resumed (and
waited on until
+ ``available``) before deletion, since a paused cluster cannot be
deleted and would
+ otherwise be left behind. Defaults to ``False``. Note that resume and
delete are
+ separate, non-transactional AWS calls: if the task fails after
resuming but before
+ deleting, the cluster is left running rather than paused. Enable only
when leaving a
+ paused cluster behind is the worse outcome (e.g. ephemeral
test-cluster teardown).
Review Comment:
This docstring entry is too long. I would suggest the below:
```
:param resume_if_paused: If ``True``, resume a paused cluster and wait for it
to become ``available`` before deleting it. Defaults to ``False``.
```
##########
providers/amazon/tests/unit/amazon/aws/operators/test_redshift_cluster.py:
##########
@@ -748,6 +748,90 @@ def test_delete_cluster_without_wait_for_completion(self,
mock_conn):
mock_conn.cluster_status.assert_not_called()
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+ @mock.patch.object(RedshiftHook, "conn")
+ def test_delete_paused_cluster_resumes_first_when_opted_in(self,
mock_conn, mock_cluster_status):
+ """With ``resume_if_paused=True`` a paused cluster is resumed (and
waited on) before delete.
+
+ A paused cluster cannot be deleted -- ``delete_cluster`` raises
+ ``InvalidClusterStateFault`` and no retry helps because a paused
cluster never resumes on
+ its own, so it would be silently leaked. Opting in resumes it first.
+ """
+ # First lookup (in _resume_if_paused) reports paused.
+ mock_cluster_status.return_value = "paused"
+
+ redshift_operator = RedshiftDeleteClusterOperator(
+ task_id="task_test",
+ cluster_identifier="test_cluster",
+ aws_conn_id="aws_conn_test",
+ wait_for_completion=False,
+ resume_if_paused=True,
+ )
+ redshift_operator.execute(None)
+
+ # Cluster was resumed and waited-on before the delete call.
+
mock_conn.resume_cluster.assert_called_once_with(ClusterIdentifier="test_cluster")
+ mock_conn.get_waiter.assert_called_once_with("cluster_available")
+ mock_conn.get_waiter.return_value.wait.assert_called_once_with(
+ ClusterIdentifier="test_cluster",
+ WaiterConfig={"Delay": 30, "MaxAttempts": 30},
+ )
+ mock_conn.delete_cluster.assert_called_once_with(
+ ClusterIdentifier="test_cluster",
+ SkipFinalClusterSnapshot=True,
+ FinalClusterSnapshotIdentifier="",
+ )
+
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+ @mock.patch.object(RedshiftHook, "conn")
+ def test_delete_paused_cluster_does_not_resume_by_default(self, mock_conn,
mock_cluster_status):
+ """Default behavior is opt-out: a paused cluster is NOT resumed
without ``resume_if_paused``.
+
+ Resume and delete are not transactional, so auto-resuming a paused
cluster could leave it
+ running if the task fails between the two calls. The default must
therefore preserve the
+ prior behavior and never resume on its own.
+ """
+ mock_cluster_status.return_value = "paused"
+
+ redshift_operator = RedshiftDeleteClusterOperator(
+ task_id="task_test",
+ cluster_identifier="test_cluster",
+ aws_conn_id="aws_conn_test",
+ wait_for_completion=False,
+ )
+ redshift_operator.execute(None)
+
+ # No resume attempted; the cluster_status lookup is not even performed.
+ mock_conn.resume_cluster.assert_not_called()
+ mock_cluster_status.assert_not_called()
+ mock_conn.delete_cluster.assert_called_once_with(
+ ClusterIdentifier="test_cluster",
+ SkipFinalClusterSnapshot=True,
+ FinalClusterSnapshotIdentifier="",
+ )
+
+
@mock.patch("airflow.providers.amazon.aws.hooks.redshift_cluster.RedshiftHook.cluster_status")
+ @mock.patch.object(RedshiftHook, "conn")
+ def test_delete_available_cluster_does_not_resume(self, mock_conn,
mock_cluster_status):
+ """An already-available cluster is deleted directly, without a
spurious resume."""
+ mock_cluster_status.return_value = "available"
+
+ redshift_operator = RedshiftDeleteClusterOperator(
+ task_id="task_test",
+ cluster_identifier="test_cluster",
+ aws_conn_id="aws_conn_test",
+ wait_for_completion=False,
+ resume_if_paused=True,
+ )
+ redshift_operator.execute(None)
+
+ mock_conn.resume_cluster.assert_not_called()
+ mock_conn.delete_cluster.assert_called_once_with(
+ ClusterIdentifier="test_cluster",
+ SkipFinalClusterSnapshot=True,
+ FinalClusterSnapshotIdentifier="",
+ )
+
Review Comment:
Overall, these tests are solid. Again, the docstrings are a bit too long but
its not as big of an issue as these are tests so I would consider needing to
fix them a non-blocking suggestion. What I would definitely add is a test that
covers this branch in your implementation:
```
try:
cluster_state =
self.hook.cluster_status(cluster_identifier=self.cluster_identifier)
except self.hook.conn.exceptions.ClusterNotFoundFault:
return
```
In other words, assert that a missing cluster is ignored.
##########
providers/amazon/src/airflow/providers/amazon/aws/operators/redshift_cluster.py:
##########
@@ -871,8 +878,50 @@ def __init__(
self._attempt_interval = 15
self.deferrable = deferrable
self.max_attempts = max_attempts
+ self.resume_if_paused = resume_if_paused
+
+ def _resume_if_paused(self) -> None:
+ """
+ Resume the cluster first if it is paused, so it can be deleted.
+
+ A ``paused`` Redshift cluster cannot be deleted -- ``delete_cluster``
raises
+ ``InvalidClusterStateFault`` ("There is an operation running on the
Cluster") and no
+ amount of retrying helps, because a paused cluster never leaves that
state on its own.
+ Left unhandled, the cluster is silently leaked (it stays paused
indefinitely until
+ external cleanup reaps it). Resume it and wait until it is
``available`` before deleting.
+
+ Gated behind ``resume_if_paused`` (opt-in). Resume and delete are two
separate AWS calls
+ and cannot be made transactional: if the task fails after the resume
succeeds but before
+ the delete does, the cluster is left ``available`` (running) rather
than paused -- the
+ inverse surprise. Callers opt in only when leaving a paused cluster
behind is the worse
+ outcome (e.g. teardown of an ephemeral test cluster).
+ """
+ try:
+ cluster_state =
self.hook.cluster_status(cluster_identifier=self.cluster_identifier)
+ except self.hook.conn.exceptions.ClusterNotFoundFault:
+ return
+
+ if cluster_state != "paused":
Review Comment:
Same here. I would suggest the below:
```
self.log.info(
"Cluster %s is in state %s; skipping resume.",
self.cluster_identifier,
cluster_state,
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]