phet commented on code in PR #3976:
URL: https://github.com/apache/gobblin/pull/3976#discussion_r1644032306


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -85,11 +91,20 @@ public void execute() throws IOException {
       if (!isJustPriorDestMetadataStillCurrent) {
         throw new IOException("error: likely concurrent writing to 
destination: " + determinationMsg);
       }
-      destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, 
currentDestMetadata);
+      Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
+          .retryIfException()
+          
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();
+      registerRetryer.call(() -> {
+        destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, 
currentDestMetadata);
+        return null;
+      });
     } catch (IcebergTable.TableNotFoundException tnfe) {
       String msg = "Destination table (with TableMetadata) does not exist: " + 
tnfe.getMessage();
       log.error(msg);
       throw new IOException(msg, tnfe);
+    } catch (ExecutionException | RetryException e) {

Review Comment:
   see the comment on line 90 about:
   > org.apache.iceberg.exceptions.CommitFailedException: Cannot commit: stale 
table metadata
   
   we should probably detect that one and give up retrying (potentially with a 
clearer error message).
   
   moreover, `ExecutionException` and `RetryException` are separate 
circumstances and probably shouldn't be clubbed into the same handler.  see 
[one example 
here](https://github.com/apache/gobblin/blob/0d4ee241bef8d68f257f91ebd878e0b39698f68c/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java#L248).
   
   and, do unwrap before re-throwing.



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -85,11 +91,20 @@ public void execute() throws IOException {
       if (!isJustPriorDestMetadataStillCurrent) {
         throw new IOException("error: likely concurrent writing to 
destination: " + determinationMsg);
       }
-      destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata, 
currentDestMetadata);
+      Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
+          .retryIfException()
+          
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();

Review Comment:
   in general, we try to avoid hard-coding config into our jobs.  see 
`KafkaJobStatusMonitor` for [an 
example](https://github.com/apache/gobblin/blob/0d4ee241bef8d68f257f91ebd878e0b39698f68c/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java#L138)
 of using a config prefix, then *falling back to* some hard-coded defaults.  in 
addition, a `RetryListener` may be helpful toward leaving an indication in the 
logs of why we're retrying



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to