eladkal commented on code in PR #38287:
URL: https://github.com/apache/airflow/pull/38287#discussion_r1607748032


##########
airflow/providers/amazon/aws/operators/neptune.py:
##########
@@ -32,6 +36,50 @@
     from airflow.utils.context import Context
 
 
+def handle_waitable_exception(
+    operator: NeptuneStartDbClusterOperator | NeptuneStopDbClusterOperator, 
err: str
+):
+    """Handle client exceptions for invalid cluster or invalid instance status 
that are temporary.
+
+    After status change, its possible to retry. Waiter will handle terminal 
status.
+    """
+    code = err
+
+    if code in ("InvalidDBInstanceStateFault", "InvalidDBInstanceState"):
+        if operator.deferrable:
+            operator.log.info("Deferring until instances become available: 
%s", operator.cluster_id)
+            operator.defer(
+                trigger=NeptuneClusterInstancesAvailableTrigger(
+                    aws_conn_id=operator.aws_conn_id,
+                    db_cluster_id=operator.cluster_id,
+                    region_name=operator.region_name,
+                    botocore_config=operator.botocore_config,
+                    verify=operator.verify,
+                ),
+                method_name="execute",
+            )
+        else:
+            operator.log.info("Need to wait for instances to become available: 
%s", operator.cluster_id)
+            
operator.hook.wait_for_cluster_instance_availability(cluster_id=operator.cluster_id)
+    if code in ["InvalidClusterState", "InvalidDBClusterStateFault"]:
+        if operator.deferrable:
+            operator.log.info("Deferring until cluster becomes available: %s", 
operator.cluster_id)
+            operator.defer(
+                trigger=NeptuneClusterAvailableTrigger(
+                    aws_conn_id=operator.aws_conn_id,
+                    db_cluster_id=operator.cluster_id,
+                    region_name=operator.region_name,
+                    botocore_config=operator.botocore_config,
+                    verify=operator.verify,
+                ),
+                method_name="execute",
+            )
+        else:
+            operator.log.info("Need to wait for cluster to become available: 
%s", operator.cluster_id)
+            operator.hook.wait_for_cluster_availability(operator.cluster_id)
+
+
+

Review Comment:
   ```suggestion
   ```
   fix static checks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to