amogh-jahagirdar commented on code in PR #7198:
URL: https://github.com/apache/iceberg/pull/7198#discussion_r1156739798


##########
aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java:
##########
@@ -116,20 +118,30 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
                   .build());
       checkMetadataLocation(table, base);
       Map<String, String> properties = prepareProperties(table, 
newMetadataLocation);
-      persistTable(tableKey, table, properties);
+      persistTable(tableKey, table, properties, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
-    } catch (ConditionalCheckFailedException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s: concurrent update detected", tableName());
     } catch (CommitFailedException e) {
       // any explicit commit failures are passed up and out to the retry 
handler
       throw e;
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to 
reconnect and check.",
-          fullTableName,
-          persistFailure);
-      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      boolean conditionCheckFailed = persistFailure instanceof 
ConditionalCheckFailedException;
+
+      // If we got an exception we weren't expecting, or we got a 
ConditionalCheckFailedException
+      // but retries were performed, attempt to reconcile the actual commit 
status.
+      if (!conditionCheckFailed || retryDetector.retried()) {
+        LOG.error(
+            "Confirming if commit to {} indeed failed to persist, attempting 
to reconnect and check.",
+            fullTableName,
+            persistFailure);

Review Comment:
   Should this be at a `warn` level instead? If the commit did end up being 
successful, then it may mislead people if it's at error level. also nit on the 
message, maybe "Received unexpected failure when committing, validating if 
commit ended up succeeding"



##########
aws/src/main/java/org/apache/iceberg/aws/util/RetryDetector.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.aws.util;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import software.amazon.awssdk.core.metrics.CoreMetric;
+import software.amazon.awssdk.metrics.MetricCollection;
+import software.amazon.awssdk.metrics.MetricPublisher;
+
+/**
+ * Metrics are the only reliable way provided by the AWS SDK to determine if 
an API call was
+ * retried. This class can be attached to an AWS API call and checked after to 
determine if retries
+ * occurred.
+ */
+public class RetryDetector implements MetricPublisher {

Review Comment:
   Could we avoid making this public for now? Since it's used within the 
`TableOperations` implementation we should be able to make it package private. 
At a later point, if there's a desire to expose this outside of the AWS module 
we can make it public



##########
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java:
##########
@@ -154,47 +156,29 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
       Table glueTable = getGlueTable();
       checkMetadataLocation(glueTable, base);
       Map<String, String> properties = prepareProperties(glueTable, 
newMetadataLocation);
-      persistGlueTable(glueTable, properties, metadata);
+      persistGlueTable(glueTable, properties, metadata, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
     } catch (CommitFailedException e) {
       throw e;
-    } catch (ConcurrentModificationException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s because Glue detected concurrent update", 
tableName());
-    } catch (software.amazon.awssdk.services.glue.model.AlreadyExistsException 
e) {
-      throw new AlreadyExistsException(
-          e,
-          "Cannot commit %s because its Glue table already exists when trying 
to create one",
-          tableName());
-    } catch (EntityNotFoundException e) {
-      throw new NotFoundException(
-          e, "Cannot commit %s because Glue cannot find the requested entity", 
tableName());
-    } catch (AccessDeniedException e) {
-      throw new ForbiddenException(
-          e, "Cannot commit %s because Glue cannot access the requested 
resources", tableName());
-    } catch (software.amazon.awssdk.services.glue.model.ValidationException e) 
{
-      throw new ValidationException(
-          e,
-          "Cannot commit %s because Glue encountered a validation exception "
-              + "while accessing requested resources",
-          tableName());
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to 
reconnect and check.",
-          fullTableName,
-          persistFailure);
+      boolean isAwsServiceException = persistFailure instanceof 
AwsServiceException;
 
-      if (persistFailure instanceof AwsServiceException) {
-        int statusCode = ((AwsServiceException) persistFailure).statusCode();
-        if (statusCode >= 500 && statusCode < 600) {
-          commitStatus = CommitStatus.FAILURE;
-        } else {
-          throw persistFailure;
-        }
-      } else {
+      // If we got an exception we weren't expecting, or we got an AWS service 
exception
+      // but retries were performed, attempt to reconcile the actual commit 
status.
+      if (!isAwsServiceException || retryDetector.retried()) {
+        LOG.error(

Review Comment:
   Same as above on warn level and message



##########
aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java:
##########
@@ -116,20 +118,30 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
                   .build());
       checkMetadataLocation(table, base);
       Map<String, String> properties = prepareProperties(table, 
newMetadataLocation);
-      persistTable(tableKey, table, properties);
+      persistTable(tableKey, table, properties, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
-    } catch (ConditionalCheckFailedException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s: concurrent update detected", tableName());
     } catch (CommitFailedException e) {
       // any explicit commit failures are passed up and out to the retry 
handler
       throw e;
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to 
reconnect and check.",
-          fullTableName,
-          persistFailure);
-      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      boolean conditionCheckFailed = persistFailure instanceof 
ConditionalCheckFailedException;
+
+      // If we got an exception we weren't expecting, or we got a 
ConditionalCheckFailedException
+      // but retries were performed, attempt to reconcile the actual commit 
status.
+      if (!conditionCheckFailed || retryDetector.retried()) {
+        LOG.error(
+            "Confirming if commit to {} indeed failed to persist, attempting 
to reconnect and check.",
+            fullTableName,
+            persistFailure);
+        commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      }
+
+      // If we got ConditionalCheckFailedException, but find we
+      // succeeded on a retry that threw an exception, skip this exception.
+      if (commitStatus != CommitStatus.SUCCESS && conditionCheckFailed) {
+        throw new CommitFailedException(
+            persistFailure, "Cannot commit %s: concurrent update detected", 
tableName());
+      }

Review Comment:
   Shouldn't we still throw if the commit status is unknown?



##########
aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java:
##########
@@ -116,20 +118,30 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
                   .build());
       checkMetadataLocation(table, base);
       Map<String, String> properties = prepareProperties(table, 
newMetadataLocation);
-      persistTable(tableKey, table, properties);
+      persistTable(tableKey, table, properties, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
-    } catch (ConditionalCheckFailedException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s: concurrent update detected", tableName());
     } catch (CommitFailedException e) {
       // any explicit commit failures are passed up and out to the retry 
handler
       throw e;
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to 
reconnect and check.",
-          fullTableName,
-          persistFailure);
-      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      boolean conditionCheckFailed = persistFailure instanceof 
ConditionalCheckFailedException;
+
+      // If we got an exception we weren't expecting, or we got a 
ConditionalCheckFailedException
+      // but retries were performed, attempt to reconcile the actual commit 
status.
+      if (!conditionCheckFailed || retryDetector.retried()) {
+        LOG.error(
+            "Confirming if commit to {} indeed failed to persist, attempting 
to reconnect and check.",
+            fullTableName,
+            persistFailure);
+        commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      }
+
+      // If we got ConditionalCheckFailedException, but find we
+      // succeeded on a retry that threw an exception, skip this exception.

Review Comment:
   Nit: I get what the inline comment is going for and it's true, but I would 
probably just leave this out since it doesn't directly correspond with the 
block below. Or a comment could be something like "Only throw after checking 
the commit status  with consideration to retries, and if the condition check 
failed". 
   



##########
aws/src/main/java/org/apache/iceberg/aws/dynamodb/DynamoDbTableOperations.java:
##########
@@ -116,20 +118,30 @@ protected void doCommit(TableMetadata base, TableMetadata 
metadata) {
                   .build());
       checkMetadataLocation(table, base);
       Map<String, String> properties = prepareProperties(table, 
newMetadataLocation);
-      persistTable(tableKey, table, properties);
+      persistTable(tableKey, table, properties, retryDetector);
       commitStatus = CommitStatus.SUCCESS;
-    } catch (ConditionalCheckFailedException e) {
-      throw new CommitFailedException(
-          e, "Cannot commit %s: concurrent update detected", tableName());
     } catch (CommitFailedException e) {
       // any explicit commit failures are passed up and out to the retry 
handler
       throw e;
     } catch (RuntimeException persistFailure) {
-      LOG.error(
-          "Confirming if commit to {} indeed failed to persist, attempting to 
reconnect and check.",
-          fullTableName,
-          persistFailure);
-      commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      boolean conditionCheckFailed = persistFailure instanceof 
ConditionalCheckFailedException;
+
+      // If we got an exception we weren't expecting, or we got a 
ConditionalCheckFailedException
+      // but retries were performed, attempt to reconcile the actual commit 
status.
+      if (!conditionCheckFailed || retryDetector.retried()) {
+        LOG.error(
+            "Confirming if commit to {} indeed failed to persist, attempting 
to reconnect and check.",
+            fullTableName,
+            persistFailure);
+        commitStatus = checkCommitStatus(newMetadataLocation, metadata);
+      }
+
+      // If we got ConditionalCheckFailedException, but find we
+      // succeeded on a retry that threw an exception, skip this exception.
+      if (commitStatus != CommitStatus.SUCCESS && conditionCheckFailed) {
+        throw new CommitFailedException(
+            persistFailure, "Cannot commit %s: concurrent update detected", 
tableName());
+      }

Review Comment:
   Ah nvm that's still done below okay cool! 



##########
aws/src/main/java/org/apache/iceberg/aws/glue/GlueTableOperations.java:
##########
@@ -340,6 +330,39 @@ void persistGlueTable(Table glueTable, Map<String, String> 
parameters, TableMeta
     }
   }
 
+  private void handleAWSExceptions(AwsServiceException persistFailure) {
+    int statusCode = persistFailure.statusCode();

Review Comment:
   Very minor nit: Generally for better readability I try and having variables 
as close as possible to where they are read. I think we can move the status 
code to the else block or have some separate helper method 
`shouldThrow(AwsServiceexception)` which just returns based on the critera



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to