This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a0ca3eadba5 [HUDI-6316] Adding corrupted and rollback log blocks 
metrics (#8881)
a0ca3eadba5 is described below

commit a0ca3eadba5a041aaaf97d040adffc017cfdd285
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Sun Jul 9 23:10:43 2023 -0400

    [HUDI-6316] Adding corrupted and rollback log blocks metrics (#8881)
    
    Adding log block metrics to track corrupted lock blocks and rollback blocks.
    Users need to enable `hoodie.metrics.compaction.log.blocks.on`
    to enable the metrics.
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  7 ++
 .../hudi/config/metrics/HoodieMetricsConfig.java   | 11 +++
 .../org/apache/hudi/metrics/HoodieMetrics.java     | 79 ++++++++++++++--------
 .../org/apache/hudi/metrics/TestHoodieMetrics.java | 41 ++++++-----
 .../hudi/common/model/HoodieCommitMetadata.java    | 20 ++++++
 5 files changed, 113 insertions(+), 45 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 93105491180..1390408d901 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2088,6 +2088,13 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getBoolean(HoodieMetricsConfig.TURN_METRICS_ON);
   }
 
+  /**
+   * metrics properties.
+   */
+  public boolean isCompactionLogBlockMetricsOn() {
+    return 
getBoolean(HoodieMetricsConfig.TURN_METRICS_COMPACTION_LOG_BLOCKS_ON);
+  }
+
   public boolean isExecutorMetricsEnabled() {
     return Boolean.parseBoolean(
         getStringOrDefault(HoodieMetricsConfig.EXECUTOR_METRICS_ENABLE, 
"false"));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
index 9fe9b33a546..e1d0afeb6fa 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java
@@ -106,6 +106,12 @@ public class HoodieMetricsConfig extends HoodieConfig {
       .sinceVersion("0.14.0")
       .withDocumentation("Comma separated list of config file paths for metric 
exporter configs");
 
+  public static final ConfigProperty<Boolean> 
TURN_METRICS_COMPACTION_LOG_BLOCKS_ON = ConfigProperty
+      .key(METRIC_PREFIX + "compaction.log.blocks.on")
+      .defaultValue(false)
+      .sinceVersion("0.14.0")
+      .withDocumentation("Turn on/off metrics reporting for log blocks with 
compaction commit. off by default.");
+
   /**
    * @deprecated Use {@link #TURN_METRICS_ON} and its methods instead
    */
@@ -171,6 +177,11 @@ public class HoodieMetricsConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder compactionLogBlocksEnable(boolean 
compactionLogBlockMetricsEnable) {
+      hoodieMetricsConfig.setValue(TURN_METRICS_COMPACTION_LOG_BLOCKS_ON, 
String.valueOf(compactionLogBlockMetricsEnable));
+      return this;
+    }
+
     public Builder withReporterType(String reporterType) {
       hoodieMetricsConfig.setValue(METRICS_REPORTER_TYPE_VALUE, reporterType);
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index dac680a5c40..792d0cd0844 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -37,6 +37,23 @@ public class HoodieMetrics {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMetrics.class);
 
+  public static final String TOTAL_PARTITIONS_WRITTEN_STR = 
"totalPartitionsWritten";
+  public static final String TOTAL_FILES_INSERT_STR = "totalFilesInsert";
+  public static final String TOTAL_FILES_UPDATE_STR = "totalFilesUpdate";
+  public static final String TOTAL_RECORDS_WRITTEN_STR = "totalRecordsWritten";
+  public static final String TOTAL_UPDATE_RECORDS_WRITTEN_STR = 
"totalUpdateRecordsWritten";
+  public static final String TOTAL_INSERT_RECORDS_WRITTEN_STR = 
"totalInsertRecordsWritten";
+  public static final String TOTAL_BYTES_WRITTEN_STR = "totalBytesWritten";
+  public static final String TOTAL_SCAN_TIME_STR = "totalScanTime";
+  public static final String TOTAL_CREATE_TIME_STR = "totalCreateTime";
+  public static final String TOTAL_UPSERT_TIME_STR = "totalUpsertTime";
+  public static final String TOTAL_COMPACTED_RECORDS_UPDATED_STR = 
"totalCompactedRecordsUpdated";
+  public static final String TOTAL_LOG_FILES_COMPACTED_STR = 
"totalLogFilesCompacted";
+  public static final String TOTAL_LOG_FILES_SIZE_STR = "totalLogFilesSize";
+  public static final String TOTAL_RECORDS_DELETED = "totalRecordsDeleted";
+  public static final String TOTAL_CORRUPTED_LOG_BLOCKS_STR = 
"totalCorruptedLogBlocks";
+  public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR = 
"totalRollbackLogBlocks";
+
   private Metrics metrics;
   // Some timers
   public String rollbackTimerName = null;
@@ -175,20 +192,20 @@ public class HoodieMetrics {
       // No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY.
       return;
     }
-    metrics.registerGauge(getMetricsName(actionType, 
"totalPartitionsWritten"), 0);
-    metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0);
-    metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0);
-    metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 
0);
-    metrics.registerGauge(getMetricsName(actionType, 
"totalUpdateRecordsWritten"), 0);
-    metrics.registerGauge(getMetricsName(actionType, 
"totalInsertRecordsWritten"), 0);
-    metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), 
0);
-    metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0);
-    metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0);
-    metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0);
-    metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0);
-    metrics.registerGauge(getMetricsName(actionType, 
"totalCompactedRecordsUpdated"), 0);
-    metrics.registerGauge(getMetricsName(actionType, 
"totalLogFilesCompacted"), 0);
-    metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0);
+    metrics.registerGauge(getMetricsName(actionType, 
TOTAL_PARTITIONS_WRITTEN_STR), 0);
+    metrics.registerGauge(getMetricsName(actionType, TOTAL_FILES_INSERT_STR), 
0);
+    metrics.registerGauge(getMetricsName(actionType, TOTAL_FILES_UPDATE_STR), 
0);
+    metrics.registerGauge(getMetricsName(actionType, 
TOTAL_RECORDS_WRITTEN_STR), 0);
+    metrics.registerGauge(getMetricsName(actionType, 
TOTAL_UPDATE_RECORDS_WRITTEN_STR), 0);
+    metrics.registerGauge(getMetricsName(actionType, 
TOTAL_INSERT_RECORDS_WRITTEN_STR), 0);
+    metrics.registerGauge(getMetricsName(actionType, TOTAL_RECORDS_DELETED), 
0);
+    metrics.registerGauge(getMetricsName(actionType, TOTAL_BYTES_WRITTEN_STR), 
0);
+    metrics.registerGauge(getMetricsName(actionType, TOTAL_SCAN_TIME_STR), 0);
+    metrics.registerGauge(getMetricsName(actionType, TOTAL_CREATE_TIME_STR), 
0);
+    metrics.registerGauge(getMetricsName(actionType, TOTAL_UPSERT_TIME_STR), 
0);
+    metrics.registerGauge(getMetricsName(actionType, 
TOTAL_COMPACTED_RECORDS_UPDATED_STR), 0);
+    metrics.registerGauge(getMetricsName(actionType, 
TOTAL_LOG_FILES_COMPACTED_STR), 0);
+    metrics.registerGauge(getMetricsName(actionType, 
TOTAL_LOG_FILES_SIZE_STR), 0);
   }
 
   public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, 
HoodieCommitMetadata metadata,
@@ -209,20 +226,26 @@ public class HoodieMetrics {
       long totalCompactedRecordsUpdated = 
metadata.getTotalCompactedRecordsUpdated();
       long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted();
       long totalLogFilesSize = metadata.getTotalLogFilesSize();
-      metrics.registerGauge(getMetricsName(actionType, 
"totalPartitionsWritten"), totalPartitionsWritten);
-      metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 
totalFilesInsert);
-      metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 
totalFilesUpdate);
-      metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 
totalRecordsWritten);
-      metrics.registerGauge(getMetricsName(actionType, 
"totalUpdateRecordsWritten"), totalUpdateRecordsWritten);
-      metrics.registerGauge(getMetricsName(actionType, 
"totalInsertRecordsWritten"), totalInsertRecordsWritten);
-      metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 
totalBytesWritten);
-      metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 
totalTimeTakenByScanner);
-      metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 
totalTimeTakenForInsert);
-      metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 
totalTimeTakenForUpsert);
-      metrics.registerGauge(getMetricsName(actionType, 
"totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated);
-      metrics.registerGauge(getMetricsName(actionType, 
"totalLogFilesCompacted"), totalLogFilesCompacted);
-      metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 
totalLogFilesSize);
-      metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), 
totalRecordsDeleted);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_PARTITIONS_WRITTEN_STR), totalPartitionsWritten);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_FILES_INSERT_STR), totalFilesInsert);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_FILES_UPDATE_STR), totalFilesUpdate);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_RECORDS_WRITTEN_STR), totalRecordsWritten);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_UPDATE_RECORDS_WRITTEN_STR), totalUpdateRecordsWritten);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_INSERT_RECORDS_WRITTEN_STR), totalInsertRecordsWritten);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_BYTES_WRITTEN_STR), totalBytesWritten);
+      metrics.registerGauge(getMetricsName(actionType, TOTAL_SCAN_TIME_STR), 
totalTimeTakenByScanner);
+      metrics.registerGauge(getMetricsName(actionType, TOTAL_CREATE_TIME_STR), 
totalTimeTakenForInsert);
+      metrics.registerGauge(getMetricsName(actionType, TOTAL_UPSERT_TIME_STR), 
totalTimeTakenForUpsert);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_COMPACTED_RECORDS_UPDATED_STR), totalCompactedRecordsUpdated);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_LOG_FILES_COMPACTED_STR), totalLogFilesCompacted);
+      metrics.registerGauge(getMetricsName(actionType, 
TOTAL_LOG_FILES_SIZE_STR), totalLogFilesSize);
+      metrics.registerGauge(getMetricsName(actionType, TOTAL_RECORDS_DELETED), 
totalRecordsDeleted);
+      if (config.isCompactionLogBlockMetricsOn()) {
+        long totalCorruptedLogBlocks = metadata.getTotalCorruptLogBlocks();
+        long totalRollbackLogBlocks = metadata.getTotalRollbackLogBlocks();
+        metrics.registerGauge(getMetricsName(actionType, 
TOTAL_CORRUPTED_LOG_BLOCKS_STR), totalCorruptedLogBlocks);
+        metrics.registerGauge(getMetricsName(actionType, 
TOTAL_ROLLBACK_LOG_BLOCKS_STR), totalRollbackLogBlocks);
+      }
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index 1598810ce42..f305c9d1776 100755
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -72,7 +72,6 @@ public class TestHoodieMetrics {
   @Test
   public void testTimerCtx() throws InterruptedException {
     Random rand = new Random();
-
     // Index metrics
     Timer.Context timer = hoodieMetrics.getIndexCtx();
     Thread.sleep(5); // Ensure timer duration is > 0
@@ -141,42 +140,50 @@ public class TestHoodieMetrics {
       when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12);
       when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13);
       when(metadata.getTotalRecordsDeleted()).thenReturn(randomValue + 14);
+      when(metadata.getTotalCorruptLogBlocks()).thenReturn(randomValue + 15);
+      when(metadata.getTotalRollbackLogBlocks()).thenReturn(randomValue + 16);
       
when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(), 
Option.empty()));
-      hoodieMetrics.updateCommitMetrics(randomValue + 15, commitTimer.stop(), 
metadata, action);
+      when(config.isCompactionLogBlockMetricsOn()).thenReturn(true);
+
+      hoodieMetrics.updateCommitMetrics(randomValue + 17, commitTimer.stop(), 
metadata, action);
 
       String metricname = hoodieMetrics.getMetricsName(action, "duration");
       long duration = 
(Long)metrics.getRegistry().getGauges().get(metricname).getValue();
       assertTrue(duration > 0);
-      metricname = hoodieMetrics.getMetricsName(action, 
"totalPartitionsWritten");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_PARTITIONS_WRITTEN_STR);
       
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
 metadata.fetchTotalPartitionsWritten());
-      metricname = hoodieMetrics.getMetricsName(action, "totalFilesInsert");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_FILES_INSERT_STR);
       
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
 metadata.fetchTotalFilesInsert());
-      metricname = hoodieMetrics.getMetricsName(action, "totalFilesUpdate");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_FILES_UPDATE_STR);
       
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
 metadata.fetchTotalFilesUpdated());
-      metricname = hoodieMetrics.getMetricsName(action, "totalRecordsWritten");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_RECORDS_WRITTEN_STR);
       
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
 metadata.fetchTotalRecordsWritten());
-      metricname = hoodieMetrics.getMetricsName(action, 
"totalUpdateRecordsWritten");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_UPDATE_RECORDS_WRITTEN_STR);
       
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
 metadata.fetchTotalUpdateRecordsWritten());
-      metricname = hoodieMetrics.getMetricsName(action, 
"totalInsertRecordsWritten");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_INSERT_RECORDS_WRITTEN_STR);
       
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
 metadata.fetchTotalInsertRecordsWritten());
-      metricname = hoodieMetrics.getMetricsName(action, "totalBytesWritten");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_BYTES_WRITTEN_STR);
       
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
 metadata.fetchTotalBytesWritten());
       metricname = hoodieMetrics.getMetricsName(action, "commitTime");
-      
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
 randomValue + 15);
-      metricname = hoodieMetrics.getMetricsName(action, "totalScanTime");
+      
assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(),
 randomValue + 17);
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_SCAN_TIME_STR);
       
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), 
metadata.getTotalScanTime());
-      metricname = hoodieMetrics.getMetricsName(action, "totalCreateTime");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_CREATE_TIME_STR);
       
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), 
metadata.getTotalCreateTime());
-      metricname = hoodieMetrics.getMetricsName(action, "totalUpsertTime");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_UPSERT_TIME_STR);
       
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), 
metadata.getTotalUpsertTime());
-      metricname = hoodieMetrics.getMetricsName(action, 
"totalCompactedRecordsUpdated");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_COMPACTED_RECORDS_UPDATED_STR);
       
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), 
metadata.getTotalCompactedRecordsUpdated());
-      metricname = hoodieMetrics.getMetricsName(action, 
"totalLogFilesCompacted");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_LOG_FILES_COMPACTED_STR);
       
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), 
metadata.getTotalLogFilesCompacted());
-      metricname = hoodieMetrics.getMetricsName(action, "totalLogFilesSize");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_LOG_FILES_SIZE_STR);
       
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), 
metadata.getTotalLogFilesSize());
-      metricname = hoodieMetrics.getMetricsName(action, "totalRecordsDeleted");
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_RECORDS_DELETED);
       
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), 
metadata.getTotalRecordsDeleted());
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_CORRUPTED_LOG_BLOCKS_STR);
+      
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), 
metadata.getTotalCorruptLogBlocks());
+      metricname = hoodieMetrics.getMetricsName(action, 
HoodieMetrics.TOTAL_ROLLBACK_LOG_BLOCKS_STR);
+      
assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), 
metadata.getTotalRollbackLogBlocks());
     });
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
index 1ed8ff241dc..795e6cfe7a6 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java
@@ -398,6 +398,26 @@ public class HoodieCommitMetadata implements Serializable {
     return totalUpdateRecords;
   }
 
+  public Long getTotalCorruptLogBlocks() {
+    Long totalCorruptedLogBlocks = 0L;
+    for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {
+      for (HoodieWriteStat writeStat : entry.getValue()) {
+        totalCorruptedLogBlocks += writeStat.getTotalCorruptLogBlock();
+      }
+    }
+    return totalCorruptedLogBlocks;
+  }
+
+  public Long getTotalRollbackLogBlocks() {
+    Long totalRollbackLogBlocks = 0L;
+    for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {
+      for (HoodieWriteStat writeStat : entry.getValue()) {
+        totalRollbackLogBlocks += writeStat.getTotalRollbackBlocks();
+      }
+    }
+    return totalRollbackLogBlocks;
+  }
+
   public Long getTotalLogFilesSize() {
     Long totalLogFilesSize = 0L;
     for (Map.Entry<String, List<HoodieWriteStat>> entry : 
partitionToWriteStats.entrySet()) {

Reply via email to