phet commented on code in PR #3976:
URL: https://github.com/apache/gobblin/pull/3976#discussion_r1644032306
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -85,11 +91,20 @@ public void execute() throws IOException {
if (!isJustPriorDestMetadataStillCurrent) {
throw new IOException("error: likely concurrent writing to
destination: " + determinationMsg);
}
- destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata,
currentDestMetadata);
+ Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
+ .retryIfException()
+
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();
+ registerRetryer.call(() -> {
+ destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata,
currentDestMetadata);
+ return null;
+ });
} catch (IcebergTable.TableNotFoundException tnfe) {
String msg = "Destination table (with TableMetadata) does not exist: " +
tnfe.getMessage();
log.error(msg);
throw new IOException(msg, tnfe);
+ } catch (ExecutionException | RetryException e) {
Review Comment:
see the comment on line 90 about:
> org.apache.iceberg.exceptions.CommitFailedException: Cannot commit: stale
table metadata
we should probably detect that one and give up retrying (potentially with a
clearer error message).
moreover, `ExecutionException` and `RetryException` are separate
circumstances and probably shouldn't be clubbed into the same handler. see
[one example
here](https://github.com/apache/gobblin/blob/0d4ee241bef8d68f257f91ebd878e0b39698f68c/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java#L248).
and, do unwrap before re-throwing.
##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -85,11 +91,20 @@ public void execute() throws IOException {
if (!isJustPriorDestMetadataStillCurrent) {
throw new IOException("error: likely concurrent writing to
destination: " + determinationMsg);
}
- destIcebergTable.registerIcebergTable(readTimeSrcTableMetadata,
currentDestMetadata);
+ Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
+ .retryIfException()
+
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();
Review Comment:
in general, we try to avoid hard-coding config into our jobs. see
`KafkaJobStatusMonitor` for [an
example](https://github.com/apache/gobblin/blob/0d4ee241bef8d68f257f91ebd878e0b39698f68c/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/KafkaJobStatusMonitor.java#L138)
of using a config prefix, then *falling back to* some hard-coded defaults. in
addition, a `RetryListener` may be helpful toward leaving an indication in the
logs of why we're retrying
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]