nsivabalan commented on code in PR #17930:
URL: https://github.com/apache/hudi/pull/17930#discussion_r2734001440


##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java:
##########
@@ -314,4 +318,52 @@ private HoodieActiveTimeline 
createMockTimeline(List<HoodieInstant> instants) {
     timeline.setInstants(instants);
     return timeline;
   }
+
+  @Test
+  void 
testRunPendingTableServicesOperations_withCompactionFailure_whenFailOnMDTTableServiceFailuresIsTrue()
 {
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class, 
RETURNS_DEEP_STUBS);
+    BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+
+    when(metaClient.reloadActiveTimeline()).thenReturn(timeline);
+    
when(timeline.filterPendingCompactionTimeline().countInstants()).thenReturn(1);
+
+    // Simulate compaction failure
+    RuntimeException compactionException = new RuntimeException("Compaction 
failed");
+    doThrow(compactionException).when(writeClient).runAnyPendingCompactions();
+
+    // When shouldFailOnMDTTableServiceFailures is true (default), exception 
should propagate
+    assertThrows(RuntimeException.class, () ->

Review Comment:
   when the config value is false, can we validate that the exception is 
swallowed and the caller does not see the exception



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -882,6 +890,10 @@ public int getRecordPreparationParallelism() {
     return getIntOrDefault(RECORD_PREPARATION_PARALLELISM);
   }
 
+  public boolean shouldFailOnMDTTableServiceFailures() {

Review Comment:
   this config already resides in `HoodieMetadataConfig`. 
   So, we could name this 
   `shouldFailOnTableServiceFailures` 
   
   and in general, we try to avoid using "MDT" string in any of method or 
variable naming if possible 



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -604,6 +604,14 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
           + "honor the set value for number of tasks. If not, number of write 
status's from data "
           + "table writes will be used for metadata table record preparation");
 
+  public static final ConfigProperty<Boolean> 
FAIL_ON_MDT_TABLE_SERVICE_FAILURES = ConfigProperty
+      .key(METADATA_PREFIX + ".fail.on.table.services.failures")

Review Comment:
   minor.
   we should align the config key and config property 
   `METADATA_PREFIX + ".fail.on.table.services.failures` -> `METADATA_PREFIX + 
".fail.on.table.service.failures`
   
   essentially, "services" -> "service"
   
   



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -604,6 +604,14 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
           + "honor the set value for number of tasks. If not, number of write 
status's from data "
           + "table writes will be used for metadata table record preparation");
 
+  public static final ConfigProperty<Boolean> 
FAIL_ON_MDT_TABLE_SERVICE_FAILURES = ConfigProperty
+      .key(METADATA_PREFIX + ".fail.on.table.services.failures")

Review Comment:
   oh, lets name the config property 
   FAIL_ON_TABLE_SERVICE_FAILURES



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -1189,6 +1201,11 @@ public Builder withRepartitionDefaultPartitions(int 
defaultPartitions) {
       return this;
     }
 
+    public Builder setFailJobOnMDTTableServiceFailures(boolean 
failJobOnMDTTableServiceFailures) {

Review Comment:
   again, aligning naming is very important. might seem trivial, but it will 
matter down the line. 
   
   setFailOnTableServiceFailures(boolean failOnTableServiceFailures) 



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java:
##########
@@ -314,4 +318,52 @@ private HoodieActiveTimeline 
createMockTimeline(List<HoodieInstant> instants) {
     timeline.setInstants(instants);
     return timeline;
   }
+
+  @Test
+  void 
testRunPendingTableServicesOperations_withCompactionFailure_whenFailOnMDTTableServiceFailuresIsTrue()
 {
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class, 
RETURNS_DEEP_STUBS);
+    BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+
+    when(metaClient.reloadActiveTimeline()).thenReturn(timeline);
+    
when(timeline.filterPendingCompactionTimeline().countInstants()).thenReturn(1);
+
+    // Simulate compaction failure
+    RuntimeException compactionException = new RuntimeException("Compaction 
failed");
+    doThrow(compactionException).when(writeClient).runAnyPendingCompactions();
+
+    // When shouldFailOnMDTTableServiceFailures is true (default), exception 
should propagate
+    assertThrows(RuntimeException.class, () ->
+        
HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(
+            metaClient, writeClient, true, Option.empty()),
+        "Expected exception to be thrown when 
shouldFailOnMDTTableServiceFailures is true");
+
+    verify(writeClient, times(1)).runAnyPendingCompactions();
+  }
+
+  @Test
+  void 
testRunPendingTableServicesOperations_withLogCompactionFailure_shouldIncrementMetric()
 {
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class, 
RETURNS_DEEP_STUBS);
+    BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+    HoodieMetadataMetrics metrics = mock(HoodieMetadataMetrics.class);
+
+    when(metaClient.reloadActiveTimeline()).thenReturn(timeline);
+    
when(timeline.filterPendingCompactionTimeline().countInstants()).thenReturn(0);
+    
when(timeline.filterPendingLogCompactionTimeline().countInstants()).thenReturn(1);
+
+    // Simulate log compaction failure
+    HoodieException logCompactionException = new HoodieException("Log 
compaction failed");
+    
doThrow(logCompactionException).when(writeClient).runAnyPendingLogCompactions();
+
+    // Exception should propagate and metrics should be incremented
+    assertThrows(HoodieException.class, () ->
+        
HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(
+            metaClient, writeClient, true, Option.of(metrics)),
+        "Expected exception to be thrown when log compaction fails");
+
+    verify(writeClient, times(1)).runAnyPendingLogCompactions();
+    verify(metrics, times(1)).incrementMetric(
+        HoodieMetadataMetrics.PENDING_COMPACTIONS_FAILURES, 1);

Review Comment:
   shouldn't we also validate "logcompaction_failures" is also emitted. 
   



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java:
##########
@@ -314,4 +318,52 @@ private HoodieActiveTimeline 
createMockTimeline(List<HoodieInstant> instants) {
     timeline.setInstants(instants);
     return timeline;
   }
+
+  @Test
+  void 
testRunPendingTableServicesOperations_withCompactionFailure_whenFailOnMDTTableServiceFailuresIsTrue()
 {
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+    HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class, 
RETURNS_DEEP_STUBS);
+    BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+
+    when(metaClient.reloadActiveTimeline()).thenReturn(timeline);
+    
when(timeline.filterPendingCompactionTimeline().countInstants()).thenReturn(1);
+
+    // Simulate compaction failure
+    RuntimeException compactionException = new RuntimeException("Compaction 
failed");
+    doThrow(compactionException).when(writeClient).runAnyPendingCompactions();
+
+    // When shouldFailOnMDTTableServiceFailures is true (default), exception 
should propagate
+    assertThrows(RuntimeException.class, () ->
+        
HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(
+            metaClient, writeClient, true, Option.empty()),
+        "Expected exception to be thrown when 
shouldFailOnMDTTableServiceFailures is true");
+
+    verify(writeClient, times(1)).runAnyPendingCompactions();

Review Comment:
   where are we validating the metrics?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2054,18 +2064,35 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> 
writeClient, Option<Strin
     // let's say we trigger compaction after C5 in MDT and so compaction 
completes with C4001. but C5 crashed before completing in MDT.
     // and again w/ C6, we will re-attempt compaction at which point latest 
delta commit is C4 in MDT.
     // and so we try compaction w/ instant C4001. So, we can avoid compaction 
if we already have compaction w/ same instant time.
-    if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
 {
-      LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
-    } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
-      LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
-      writeClient.compact(compactionInstantTime, true);
-    } else if (metadataWriteConfig.isLogCompactionEnabled()) {
-      // Schedule and execute log compaction with new instant time.
-      Option<String> scheduledLogCompaction = 
writeClient.scheduleLogCompaction(Option.empty());
-      if (scheduledLogCompaction.isPresent()) {
-        LOG.info("Log compaction is scheduled for timestamp {}", 
scheduledLogCompaction.get());
-        writeClient.logCompact(scheduledLogCompaction.get(), true);
+    boolean skipCompactions = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+    try {
+      if (skipCompactions) {
+        LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
+      } else if 
(writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+        LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
+        writeClient.compact(compactionInstantTime, true);
       }
+    } catch (Exception e) {
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
+      LOG.error("Error in scheduling and executing compaction on metadata 
table ", e);
+      throw e;
+    }
+
+    try {
+      if (skipCompactions) {
+        LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
+      } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+        // Schedule and execute log compaction with new instant time.
+        Option<String> scheduledLogCompaction = 
writeClient.scheduleLogCompaction(Option.empty());
+        if (scheduledLogCompaction.isPresent()) {
+          LOG.info("Log compaction is scheduled for timestamp {}", 
scheduledLogCompaction.get());
+          writeClient.logCompact(scheduledLogCompaction.get(), true);
+        }
+      }
+    } catch (Exception e) {
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.LOG_COMPACTION_FAILURES, 1));
+      LOG.error("Error in scheduling and executing logcompaction  on metadata 
table ", e);

Review Comment:
   minor.
   "in metadata table" 
   also, looks like we have double spacing between "on" and "metadata". 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2054,18 +2064,35 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> 
writeClient, Option<Strin
     // let's say we trigger compaction after C5 in MDT and so compaction 
completes with C4001. but C5 crashed before completing in MDT.
     // and again w/ C6, we will re-attempt compaction at which point latest 
delta commit is C4 in MDT.
     // and so we try compaction w/ instant C4001. So, we can avoid compaction 
if we already have compaction w/ same instant time.
-    if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
 {
-      LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
-    } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
-      LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
-      writeClient.compact(compactionInstantTime, true);
-    } else if (metadataWriteConfig.isLogCompactionEnabled()) {
-      // Schedule and execute log compaction with new instant time.
-      Option<String> scheduledLogCompaction = 
writeClient.scheduleLogCompaction(Option.empty());
-      if (scheduledLogCompaction.isPresent()) {
-        LOG.info("Log compaction is scheduled for timestamp {}", 
scheduledLogCompaction.get());
-        writeClient.logCompact(scheduledLogCompaction.get(), true);
+    boolean skipCompactions = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+    try {
+      if (skipCompactions) {
+        LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
+      } else if 
(writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+        LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
+        writeClient.compact(compactionInstantTime, true);
       }
+    } catch (Exception e) {
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
+      LOG.error("Error in scheduling and executing compaction on metadata 
table ", e);
+      throw e;
+    }
+
+    try {
+      if (skipCompactions) {
+        LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
+      } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+        // Schedule and execute log compaction with new instant time.
+        Option<String> scheduledLogCompaction = 
writeClient.scheduleLogCompaction(Option.empty());
+        if (scheduledLogCompaction.isPresent()) {
+          LOG.info("Log compaction is scheduled for timestamp {}", 
scheduledLogCompaction.get());
+          writeClient.logCompact(scheduledLogCompaction.get(), true);
+        }
+      }
+    } catch (Exception e) {
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.LOG_COMPACTION_FAILURES, 1));
+      LOG.error("Error in scheduling and executing logcompaction  on metadata 
table ", e);

Review Comment:
   lets remove the empty space in the end



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to