[ 
https://issues.apache.org/jira/browse/GOBBLIN-2088?focusedWorklogId=924005&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-924005
 ]

ASF GitHub Bot logged work on GOBBLIN-2088:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jun/24 14:28
            Start Date: 21/Jun/24 14:28
    Worklog Time Spent: 10m 
      Work Description: phet commented on code in PR #3976:
URL: https://github.com/apache/gobblin/pull/3976#discussion_r1648405152


##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -54,7 +66,12 @@ public class IcebergRegisterStep implements CommitStep {
   private final TableMetadata readTimeSrcTableMetadata;
   private final TableMetadata justPriorDestTableMetadata;
   private final Properties properties;
-  private final Integer MAX_NUMBER_OF_ATTEMPTS = 3;
+  public static final String ICEBERG_REGISTER_STEP_PREFIX = 
"icebergRegisterStep";

Review Comment:
   "fully qualify" this by prefixing w/ 
`IcebergDatasetFinder.ICEBERG_DATASET_PREFIX`.
   
   then name it more semantically, like:
   ```
   RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
"catalog.registration.retries";



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws 
IOException {
     }
   }
 
+  @Test
+  public void testRegisterIcebergTableWithRetryer() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new RuntimeException()).doThrow(new 
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(), 
any());

Review Comment:
   nit: for clarity, put each of the `.doX` behaviors on its own line.  also 
add a comment above to the effect of "fail the first two times, then succeed"



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws 
IOException {
     }
   }
 
+  @Test
+  public void testRegisterIcebergTableWithRetryer() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new RuntimeException()).doThrow(new 
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(), 
any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(), 
any());
+    } catch (RuntimeException re) {
+      Assert.fail("Got Unexpected Runtime Exception", re);
+    }
+  }
+
+  @Test
+  public void testRegisterIcebergTableWithRetryerThrowsRuntimeException() 
throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new 
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Assert.fail("Expected Runtime Exception");
+    } catch (RuntimeException re) {
+      Assert.assertTrue(re.getMessage().startsWith("Failed to register iceberg 
table (retried"), re.getMessage());

Review Comment:
   let's combine this test w/ that of passing overriding retryer config.  e.g. 
specify that it should retry N times and then look for that number in the 
message text



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -91,9 +108,7 @@ public void execute() throws IOException {
       if (!isJustPriorDestMetadataStillCurrent) {
         throw new IOException("error: likely concurrent writing to 
destination: " + determinationMsg);
       }
-      Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
-          .retryIfException()
-          
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();
+      Retryer<Void> registerRetryer = 
createRegisterRetryer(ConfigFactory.load());

Review Comment:
   rather than `.load()` turn `this.properties` into a `Config`.
   
   see the `IcebergDatasetFinder`, that `CopySource` 
[instantiates](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java#L191):
 those are the original properties of the `SourceState` that the enclosing 
gobblin job is launched with.
   
   they arrive here via the `IcebergDatasetFinder` creating an `IcebergDataset` 
that [instantiates this 
class](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java#L384).



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -102,9 +117,19 @@ public void execute() throws IOException {
       String msg = "Destination table (with TableMetadata) does not exist: " + 
tnfe.getMessage();
       log.error(msg);
       throw new IOException(msg, tnfe);
-    } catch (ExecutionException | RetryException e) {
-      log.error("Exception Encountered while Registering Iceberg Table", e);
-      throw new RuntimeException(e);
+    } catch (ExecutionException executionException) {
+      String msg = "Failed to register iceberg table";
+      log.error(msg, executionException);

Review Comment:
   given this is a heavily concurrent system and we regularly analyze log msgs 
perhaps across multiple runs, every log message must carry context.  for that 
reason, this class already has the members `srcTableIdStr` and 
`destTableIdStr`.  follow how the latter is used on L103



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -102,9 +117,19 @@ public void execute() throws IOException {
       String msg = "Destination table (with TableMetadata) does not exist: " + 
tnfe.getMessage();
       log.error(msg);
       throw new IOException(msg, tnfe);
-    } catch (ExecutionException | RetryException e) {
-      log.error("Exception Encountered while Registering Iceberg Table", e);
-      throw new RuntimeException(e);
+    } catch (ExecutionException executionException) {
+      String msg = "Failed to register iceberg table";
+      log.error(msg, executionException);
+      throw new RuntimeException(msg, executionException.getCause());
+    } catch (RetryException retryException) {
+      String interruptedNote = Thread.currentThread().isInterrupted() ? "... 
then interrupted" : "";
+      String msg = String.format("Failed to register iceberg table (retried %d 
times %s)",

Review Comment:
   same here about table context



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -54,7 +66,12 @@ public class IcebergRegisterStep implements CommitStep {
   private final TableMetadata readTimeSrcTableMetadata;
   private final TableMetadata justPriorDestTableMetadata;
   private final Properties properties;
-  private final Integer MAX_NUMBER_OF_ATTEMPTS = 3;
+  public static final String ICEBERG_REGISTER_STEP_PREFIX = 
"icebergRegisterStep";

Review Comment:
   "fully qualify" this by prefixing w/ 
`IcebergDatasetFinder.ICEBERG_DATASET_PREFIX`.
   
   then name it more semantically, like:
   ```
   RETRYER_CONFIG_PREFIX = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + 
".catalog.registration.retries";



##########
gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java:
##########
@@ -91,9 +108,7 @@ public void execute() throws IOException {
       if (!isJustPriorDestMetadataStillCurrent) {
         throw new IOException("error: likely concurrent writing to 
destination: " + determinationMsg);
       }
-      Retryer<Void> registerRetryer = RetryerBuilder.<Void>newBuilder()
-          .retryIfException()
-          
.withStopStrategy(StopStrategies.stopAfterAttempt(MAX_NUMBER_OF_ATTEMPTS)).build();
+      Retryer<Void> registerRetryer = 
createRegisterRetryer(ConfigFactory.load());

Review Comment:
   rather than `.load()` turn `this.properties` into a `Config`.
   
   see the `IcebergDatasetFinder` that `CopySource` 
[instantiates](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/CopySource.java#L191):
 those are the original properties of the `SourceState` that the enclosing 
gobblin job is launched with.
   
   they arrive here via the `IcebergDatasetFinder` creating an `IcebergDataset` 
that [instantiates this 
class](https://github.com/apache/gobblin/blob/d9f83e638b6eb34fa85b0317eee31cf39c1e8546/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java#L384).



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws 
IOException {
     }
   }
 
+  @Test
+  public void testRegisterIcebergTableWithRetryer() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new RuntimeException()).doThrow(new 
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(), 
any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(), 
any());
+    } catch (RuntimeException re) {
+      Assert.fail("Got Unexpected Runtime Exception", re);
+    }
+  }
+
+  @Test
+  public void testRegisterIcebergTableWithRetryerThrowsRuntimeException() 
throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new 
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Assert.fail("Expected Runtime Exception");
+    } catch (RuntimeException re) {
+      Assert.assertTrue(re.getMessage().startsWith("Failed to register iceberg 
table (retried"), re.getMessage());

Review Comment:
   let's combine this test and perform w/ and w/o passing overriding retryer 
config.  e.g. specify that it should retry N times and then look for that 
number in the message text



##########
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStepTest.java:
##########
@@ -75,6 +75,48 @@ protected IcebergCatalog createDestinationCatalog() throws 
IOException {
     }
   }
 
+  @Test
+  public void testRegisterIcebergTableWithRetryer() throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new RuntimeException()).doThrow(new 
RuntimeException()).doNothing().when(mockTable).registerIcebergTable(any(), 
any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Mockito.verify(mockTable, Mockito.times(3)).registerIcebergTable(any(), 
any());
+    } catch (RuntimeException re) {
+      Assert.fail("Got Unexpected Runtime Exception", re);
+    }
+  }
+
+  @Test
+  public void testRegisterIcebergTableWithRetryerThrowsRuntimeException() 
throws IOException {
+    TableMetadata justPriorDestTableMetadata = mockTableMetadata("foo", "bar");
+    TableMetadata readTimeSrcTableMetadata = Mockito.mock(TableMetadata.class);
+    IcebergTable mockTable = mockIcebergTable("foo", "bar");
+    Mockito.doThrow(new 
RuntimeException()).when(mockTable).registerIcebergTable(any(), any());
+    IcebergRegisterStep regStep =
+        new IcebergRegisterStep(srcTableId, destTableId, 
readTimeSrcTableMetadata, justPriorDestTableMetadata, new Properties()) {
+          @Override
+          protected IcebergCatalog createDestinationCatalog() throws 
IOException {
+            return mockSingleTableIcebergCatalog(mockTable);
+          }
+        };
+    try {
+      regStep.execute();
+      Assert.fail("Expected Runtime Exception");
+    } catch (RuntimeException re) {
+      Assert.assertTrue(re.getMessage().startsWith("Failed to register iceberg 
table (retried"), re.getMessage());

Review Comment:
   let's combine this test and perform w/ and w/o passing overriding retryer 
config.  e.g. specify that it should retry N times and then look for that 
number in the exception message text





Issue Time Tracking
-------------------

    Worklog Id:     (was: 924005)
    Time Spent: 1h 10m  (was: 1h)

> Add retries to OH replication final catalog commit
> --------------------------------------------------
>
>                 Key: GOBBLIN-2088
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-2088
>             Project: Apache Gobblin
>          Issue Type: Task
>            Reporter: Vivek Rai
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to