nsivabalan commented on code in PR #17930:
URL: https://github.com/apache/hudi/pull/17930#discussion_r2729633991


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2054,18 +2063,33 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> 
writeClient, Option<Strin
     // let's say we trigger compaction after C5 in MDT and so compaction 
completes with C4001. but C5 crashed before completing in MDT.
     // and again w/ C6, we will re-attempt compaction at which point latest 
delta commit is C4 in MDT.
     // and so we try compaction w/ instant C4001. So, we can avoid compaction 
if we already have compaction w/ same instant time.
-    if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
 {
-      LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
-    } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
-      LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
-      writeClient.compact(compactionInstantTime, true);
-    } else if (metadataWriteConfig.isLogCompactionEnabled()) {
-      // Schedule and execute log compaction with new instant time.
-      Option<String> scheduledLogCompaction = 
writeClient.scheduleLogCompaction(Option.empty());
-      if (scheduledLogCompaction.isPresent()) {
-        LOG.info("Log compaction is scheduled for timestamp {}", 
scheduledLogCompaction.get());
-        writeClient.logCompact(scheduledLogCompaction.get(), true);
+    boolean skipCompactions = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+    try {
+      if (skipCompactions) {
+        LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
+      } else if 
(writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+        LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
+        writeClient.compact(compactionInstantTime, true);
       }
+    } catch (Exception e) {
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
+      LOG.error("Error in scheduling and executing compaction ", e);

Review Comment:
   do we want to denote that this is for `metadata table`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2012,19 +2015,25 @@ public void performTableServices(Option<String> 
inFlightInstantTimestamp, boolea
 
   static HoodieActiveTimeline 
runPendingTableServicesOperationsAndRefreshTimeline(HoodieTableMetaClient 
metadataMetaClient,
                                                                                
   BaseHoodieWriteClient<?, ?, ?, ?> writeClient,
-                                                                               
   boolean initialTimelineRequiresRefresh) {
-    HoodieActiveTimeline activeTimeline = initialTimelineRequiresRefresh ? 
metadataMetaClient.reloadActiveTimeline() : 
metadataMetaClient.getActiveTimeline();
-    // finish off any pending log compaction or compactions operations if any 
from previous attempt.
-    boolean ranServices = false;
-    if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 0) {
-      writeClient.runAnyPendingCompactions();
-      ranServices = true;
-    }
-    if (activeTimeline.filterPendingLogCompactionTimeline().countInstants() > 
0) {
-      writeClient.runAnyPendingLogCompactions();
-      ranServices = true;
+                                                                               
   boolean initialTimelineRequiresRefresh,
+                                                                               
   Option<HoodieMetadataMetrics> metricsOption) {
+    try {
+      HoodieActiveTimeline activeTimeline = initialTimelineRequiresRefresh ? 
metadataMetaClient.reloadActiveTimeline() : 
metadataMetaClient.getActiveTimeline();
+      // finish off any pending log compaction or compactions operations if 
any from previous attempt.
+      boolean ranServices = false;
+      if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 
0) {
+        writeClient.runAnyPendingCompactions();
+        ranServices = true;
+      }
+      if (activeTimeline.filterPendingLogCompactionTimeline().countInstants() 
> 0) {
+        writeClient.runAnyPendingLogCompactions();
+        ranServices = true;
+      }
+      return ranServices ? metadataMetaClient.reloadActiveTimeline() : 
activeTimeline;
+    } finally {
+      metricsOption.ifPresent(m -> m.incrementMetric(

Review Comment:
   why we are emitting a failure metric here? 
   if we wanted to capture failure metric, don't we want to do it w/n a `catch` 
block and not `finally` block



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java:
##########
@@ -79,6 +79,9 @@ public class HoodieMetadataMetrics implements Serializable {
   public static final String TABLE_SERVICE_EXECUTION_STATUS = 
"table_service_execution_status";
   public static final String TABLE_SERVICE_EXECUTION_DURATION = 
"table_service_execution_duration";
   public static final String ASYNC_INDEXER_CATCHUP_TIME = 
"async_indexer_catchup_time";
+  public static final String COMPACTION_FAILURES = "compaction_failures";

Review Comment:
   We need to test these as well. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -2054,18 +2063,33 @@ void compactIfNecessary(BaseHoodieWriteClient<?,I,?,O> 
writeClient, Option<Strin
     // let's say we trigger compaction after C5 in MDT and so compaction 
completes with C4001. but C5 crashed before completing in MDT.
     // and again w/ C6, we will re-attempt compaction at which point latest 
delta commit is C4 in MDT.
     // and so we try compaction w/ instant C4001. So, we can avoid compaction 
if we already have compaction w/ same instant time.
-    if 
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
 {
-      LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
-    } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
-      LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
-      writeClient.compact(compactionInstantTime, true);
-    } else if (metadataWriteConfig.isLogCompactionEnabled()) {
-      // Schedule and execute log compaction with new instant time.
-      Option<String> scheduledLogCompaction = 
writeClient.scheduleLogCompaction(Option.empty());
-      if (scheduledLogCompaction.isPresent()) {
-        LOG.info("Log compaction is scheduled for timestamp {}", 
scheduledLogCompaction.get());
-        writeClient.logCompact(scheduledLogCompaction.get(), true);
+    boolean skipCompactions = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+    try {
+      if (skipCompactions) {
+        LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
+      } else if 
(writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+        LOG.info("Compaction is scheduled for timestamp {}", 
compactionInstantTime);
+        writeClient.compact(compactionInstantTime, true);
       }
+    } catch (Exception e) {
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
+      LOG.error("Error in scheduling and executing compaction ", e);
+    }
+
+    try {
+      if (skipCompactions) {
+        LOG.info("Compaction with same {} time is already present in the 
timeline.", compactionInstantTime);
+      } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+        // Schedule and execute log compaction with new instant time.
+        Option<String> scheduledLogCompaction = 
writeClient.scheduleLogCompaction(Option.empty());
+        if (scheduledLogCompaction.isPresent()) {
+          LOG.info("Log compaction is scheduled for timestamp {}", 
scheduledLogCompaction.get());
+          writeClient.logCompact(scheduledLogCompaction.get(), true);
+        }
+      }
+    } catch (Exception e) {
+      metrics.ifPresent(m -> 
m.incrementMetric(HoodieMetadataMetrics.LOG_COMPACTION_FAILURES, 1));
+      LOG.error("Error in scheduling and executing logcompaction ", e);

Review Comment:
   same here



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -604,6 +604,14 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
           + "honor the set value for number of tasks. If not, number of write 
status's from data "
           + "table writes will be used for metadata table record preparation");
 
+  public static final ConfigProperty<Boolean> 
FAIL_JOB_ON_MDT_TABLE_SERVICE_FAILURES = ConfigProperty
+      .key(METADATA_PREFIX + ".fails.job.on.table.services.failures")

Review Comment:
   `fails on` does not sit well for a config key. 
   how about 
   `hoodie.metadata.write.fail.on.table.service.failures`
   



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java:
##########
@@ -1189,6 +1201,11 @@ public Builder withRepartitionDefaultPartitions(int 
defaultPartitions) {
       return this;
     }
 
+    public Builder setFailJobOnMDTTableServiceFailures(boolean 
failJobOnMDTTableServiceFailures) {

Review Comment:
   lets test these in TestHoodieMetadataConfig



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to