This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new a0ca3eadba5 [HUDI-6316] Adding corrupted and rollback log blocks metrics (#8881) a0ca3eadba5 is described below commit a0ca3eadba5a041aaaf97d040adffc017cfdd285 Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Sun Jul 9 23:10:43 2023 -0400 [HUDI-6316] Adding corrupted and rollback log blocks metrics (#8881) Adding log block metrics to track corrupted lock blocks and rollback blocks. Users need to enable `hoodie.metrics.compaction.log.blocks.on` to enable the metrics. --- .../org/apache/hudi/config/HoodieWriteConfig.java | 7 ++ .../hudi/config/metrics/HoodieMetricsConfig.java | 11 +++ .../org/apache/hudi/metrics/HoodieMetrics.java | 79 ++++++++++++++-------- .../org/apache/hudi/metrics/TestHoodieMetrics.java | 41 ++++++----- .../hudi/common/model/HoodieCommitMetadata.java | 20 ++++++ 5 files changed, 113 insertions(+), 45 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 93105491180..1390408d901 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2088,6 +2088,13 @@ public class HoodieWriteConfig extends HoodieConfig { return getBoolean(HoodieMetricsConfig.TURN_METRICS_ON); } + /** + * metrics properties. + */ + public boolean isCompactionLogBlockMetricsOn() { + return getBoolean(HoodieMetricsConfig.TURN_METRICS_COMPACTION_LOG_BLOCKS_ON); + } + public boolean isExecutorMetricsEnabled() { return Boolean.parseBoolean( getStringOrDefault(HoodieMetricsConfig.EXECUTOR_METRICS_ENABLE, "false")); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java index 9fe9b33a546..e1d0afeb6fa 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/metrics/HoodieMetricsConfig.java @@ -106,6 +106,12 @@ public class HoodieMetricsConfig extends HoodieConfig { .sinceVersion("0.14.0") .withDocumentation("Comma separated list of config file paths for metric exporter configs"); + public static final ConfigProperty<Boolean> TURN_METRICS_COMPACTION_LOG_BLOCKS_ON = ConfigProperty + .key(METRIC_PREFIX + "compaction.log.blocks.on") + .defaultValue(false) + .sinceVersion("0.14.0") + .withDocumentation("Turn on/off metrics reporting for log blocks with compaction commit. off by default."); + /** * @deprecated Use {@link #TURN_METRICS_ON} and its methods instead */ @@ -171,6 +177,11 @@ public class HoodieMetricsConfig extends HoodieConfig { return this; } + public Builder compactionLogBlocksEnable(boolean compactionLogBlockMetricsEnable) { + hoodieMetricsConfig.setValue(TURN_METRICS_COMPACTION_LOG_BLOCKS_ON, String.valueOf(compactionLogBlockMetricsEnable)); + return this; + } + public Builder withReporterType(String reporterType) { hoodieMetricsConfig.setValue(METRICS_REPORTER_TYPE_VALUE, reporterType); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java index dac680a5c40..792d0cd0844 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java @@ -37,6 +37,23 @@ public class HoodieMetrics { private static final Logger LOG = LoggerFactory.getLogger(HoodieMetrics.class); + public static final String TOTAL_PARTITIONS_WRITTEN_STR = "totalPartitionsWritten"; + public static final String TOTAL_FILES_INSERT_STR = "totalFilesInsert"; + public static final String TOTAL_FILES_UPDATE_STR = "totalFilesUpdate"; + public static final String TOTAL_RECORDS_WRITTEN_STR = "totalRecordsWritten"; + public static final String TOTAL_UPDATE_RECORDS_WRITTEN_STR = "totalUpdateRecordsWritten"; + public static final String TOTAL_INSERT_RECORDS_WRITTEN_STR = "totalInsertRecordsWritten"; + public static final String TOTAL_BYTES_WRITTEN_STR = "totalBytesWritten"; + public static final String TOTAL_SCAN_TIME_STR = "totalScanTime"; + public static final String TOTAL_CREATE_TIME_STR = "totalCreateTime"; + public static final String TOTAL_UPSERT_TIME_STR = "totalUpsertTime"; + public static final String TOTAL_COMPACTED_RECORDS_UPDATED_STR = "totalCompactedRecordsUpdated"; + public static final String TOTAL_LOG_FILES_COMPACTED_STR = "totalLogFilesCompacted"; + public static final String TOTAL_LOG_FILES_SIZE_STR = "totalLogFilesSize"; + public static final String TOTAL_RECORDS_DELETED = "totalRecordsDeleted"; + public static final String TOTAL_CORRUPTED_LOG_BLOCKS_STR = "totalCorruptedLogBlocks"; + public static final String TOTAL_ROLLBACK_LOG_BLOCKS_STR = "totalRollbackLogBlocks"; + private Metrics metrics; // Some timers public String rollbackTimerName = null; @@ -175,20 +192,20 @@ public class HoodieMetrics { // No-op if metrics are not of type PROMETHEUS_PUSHGATEWAY. return; } - metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), 0); - metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_PARTITIONS_WRITTEN_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_FILES_INSERT_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_FILES_UPDATE_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_RECORDS_WRITTEN_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_UPDATE_RECORDS_WRITTEN_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_INSERT_RECORDS_WRITTEN_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_RECORDS_DELETED), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_BYTES_WRITTEN_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_SCAN_TIME_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_CREATE_TIME_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_UPSERT_TIME_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_COMPACTED_RECORDS_UPDATED_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_LOG_FILES_COMPACTED_STR), 0); + metrics.registerGauge(getMetricsName(actionType, TOTAL_LOG_FILES_SIZE_STR), 0); } public void updateCommitMetrics(long commitEpochTimeInMs, long durationInMs, HoodieCommitMetadata metadata, @@ -209,20 +226,26 @@ public class HoodieMetrics { long totalCompactedRecordsUpdated = metadata.getTotalCompactedRecordsUpdated(); long totalLogFilesCompacted = metadata.getTotalLogFilesCompacted(); long totalLogFilesSize = metadata.getTotalLogFilesSize(); - metrics.registerGauge(getMetricsName(actionType, "totalPartitionsWritten"), totalPartitionsWritten); - metrics.registerGauge(getMetricsName(actionType, "totalFilesInsert"), totalFilesInsert); - metrics.registerGauge(getMetricsName(actionType, "totalFilesUpdate"), totalFilesUpdate); - metrics.registerGauge(getMetricsName(actionType, "totalRecordsWritten"), totalRecordsWritten); - metrics.registerGauge(getMetricsName(actionType, "totalUpdateRecordsWritten"), totalUpdateRecordsWritten); - metrics.registerGauge(getMetricsName(actionType, "totalInsertRecordsWritten"), totalInsertRecordsWritten); - metrics.registerGauge(getMetricsName(actionType, "totalBytesWritten"), totalBytesWritten); - metrics.registerGauge(getMetricsName(actionType, "totalScanTime"), totalTimeTakenByScanner); - metrics.registerGauge(getMetricsName(actionType, "totalCreateTime"), totalTimeTakenForInsert); - metrics.registerGauge(getMetricsName(actionType, "totalUpsertTime"), totalTimeTakenForUpsert); - metrics.registerGauge(getMetricsName(actionType, "totalCompactedRecordsUpdated"), totalCompactedRecordsUpdated); - metrics.registerGauge(getMetricsName(actionType, "totalLogFilesCompacted"), totalLogFilesCompacted); - metrics.registerGauge(getMetricsName(actionType, "totalLogFilesSize"), totalLogFilesSize); - metrics.registerGauge(getMetricsName(actionType, "totalRecordsDeleted"), totalRecordsDeleted); + metrics.registerGauge(getMetricsName(actionType, TOTAL_PARTITIONS_WRITTEN_STR), totalPartitionsWritten); + metrics.registerGauge(getMetricsName(actionType, TOTAL_FILES_INSERT_STR), totalFilesInsert); + metrics.registerGauge(getMetricsName(actionType, TOTAL_FILES_UPDATE_STR), totalFilesUpdate); + metrics.registerGauge(getMetricsName(actionType, TOTAL_RECORDS_WRITTEN_STR), totalRecordsWritten); + metrics.registerGauge(getMetricsName(actionType, TOTAL_UPDATE_RECORDS_WRITTEN_STR), totalUpdateRecordsWritten); + metrics.registerGauge(getMetricsName(actionType, TOTAL_INSERT_RECORDS_WRITTEN_STR), totalInsertRecordsWritten); + metrics.registerGauge(getMetricsName(actionType, TOTAL_BYTES_WRITTEN_STR), totalBytesWritten); + metrics.registerGauge(getMetricsName(actionType, TOTAL_SCAN_TIME_STR), totalTimeTakenByScanner); + metrics.registerGauge(getMetricsName(actionType, TOTAL_CREATE_TIME_STR), totalTimeTakenForInsert); + metrics.registerGauge(getMetricsName(actionType, TOTAL_UPSERT_TIME_STR), totalTimeTakenForUpsert); + metrics.registerGauge(getMetricsName(actionType, TOTAL_COMPACTED_RECORDS_UPDATED_STR), totalCompactedRecordsUpdated); + metrics.registerGauge(getMetricsName(actionType, TOTAL_LOG_FILES_COMPACTED_STR), totalLogFilesCompacted); + metrics.registerGauge(getMetricsName(actionType, TOTAL_LOG_FILES_SIZE_STR), totalLogFilesSize); + metrics.registerGauge(getMetricsName(actionType, TOTAL_RECORDS_DELETED), totalRecordsDeleted); + if (config.isCompactionLogBlockMetricsOn()) { + long totalCorruptedLogBlocks = metadata.getTotalCorruptLogBlocks(); + long totalRollbackLogBlocks = metadata.getTotalRollbackLogBlocks(); + metrics.registerGauge(getMetricsName(actionType, TOTAL_CORRUPTED_LOG_BLOCKS_STR), totalCorruptedLogBlocks); + metrics.registerGauge(getMetricsName(actionType, TOTAL_ROLLBACK_LOG_BLOCKS_STR), totalRollbackLogBlocks); + } } } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java index 1598810ce42..f305c9d1776 100755 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java @@ -72,7 +72,6 @@ public class TestHoodieMetrics { @Test public void testTimerCtx() throws InterruptedException { Random rand = new Random(); - // Index metrics Timer.Context timer = hoodieMetrics.getIndexCtx(); Thread.sleep(5); // Ensure timer duration is > 0 @@ -141,42 +140,50 @@ public class TestHoodieMetrics { when(metadata.getTotalLogFilesCompacted()).thenReturn(randomValue + 12); when(metadata.getTotalLogFilesSize()).thenReturn(randomValue + 13); when(metadata.getTotalRecordsDeleted()).thenReturn(randomValue + 14); + when(metadata.getTotalCorruptLogBlocks()).thenReturn(randomValue + 15); + when(metadata.getTotalRollbackLogBlocks()).thenReturn(randomValue + 16); when(metadata.getMinAndMaxEventTime()).thenReturn(Pair.of(Option.empty(), Option.empty())); - hoodieMetrics.updateCommitMetrics(randomValue + 15, commitTimer.stop(), metadata, action); + when(config.isCompactionLogBlockMetricsOn()).thenReturn(true); + + hoodieMetrics.updateCommitMetrics(randomValue + 17, commitTimer.stop(), metadata, action); String metricname = hoodieMetrics.getMetricsName(action, "duration"); long duration = (Long)metrics.getRegistry().getGauges().get(metricname).getValue(); assertTrue(duration > 0); - metricname = hoodieMetrics.getMetricsName(action, "totalPartitionsWritten"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_PARTITIONS_WRITTEN_STR); assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalPartitionsWritten()); - metricname = hoodieMetrics.getMetricsName(action, "totalFilesInsert"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_FILES_INSERT_STR); assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesInsert()); - metricname = hoodieMetrics.getMetricsName(action, "totalFilesUpdate"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_FILES_UPDATE_STR); assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalFilesUpdated()); - metricname = hoodieMetrics.getMetricsName(action, "totalRecordsWritten"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_RECORDS_WRITTEN_STR); assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalRecordsWritten()); - metricname = hoodieMetrics.getMetricsName(action, "totalUpdateRecordsWritten"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_UPDATE_RECORDS_WRITTEN_STR); assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalUpdateRecordsWritten()); - metricname = hoodieMetrics.getMetricsName(action, "totalInsertRecordsWritten"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_INSERT_RECORDS_WRITTEN_STR); assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalInsertRecordsWritten()); - metricname = hoodieMetrics.getMetricsName(action, "totalBytesWritten"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_BYTES_WRITTEN_STR); assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.fetchTotalBytesWritten()); metricname = hoodieMetrics.getMetricsName(action, "commitTime"); - assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), randomValue + 15); - metricname = hoodieMetrics.getMetricsName(action, "totalScanTime"); + assertEquals((long)metrics.getRegistry().getGauges().get(metricname).getValue(), randomValue + 17); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_SCAN_TIME_STR); assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalScanTime()); - metricname = hoodieMetrics.getMetricsName(action, "totalCreateTime"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_CREATE_TIME_STR); assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCreateTime()); - metricname = hoodieMetrics.getMetricsName(action, "totalUpsertTime"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_UPSERT_TIME_STR); assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalUpsertTime()); - metricname = hoodieMetrics.getMetricsName(action, "totalCompactedRecordsUpdated"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_COMPACTED_RECORDS_UPDATED_STR); assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCompactedRecordsUpdated()); - metricname = hoodieMetrics.getMetricsName(action, "totalLogFilesCompacted"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_LOG_FILES_COMPACTED_STR); assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesCompacted()); - metricname = hoodieMetrics.getMetricsName(action, "totalLogFilesSize"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_LOG_FILES_SIZE_STR); assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalLogFilesSize()); - metricname = hoodieMetrics.getMetricsName(action, "totalRecordsDeleted"); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_RECORDS_DELETED); assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalRecordsDeleted()); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_CORRUPTED_LOG_BLOCKS_STR); + assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalCorruptLogBlocks()); + metricname = hoodieMetrics.getMetricsName(action, HoodieMetrics.TOTAL_ROLLBACK_LOG_BLOCKS_STR); + assertEquals(metrics.getRegistry().getGauges().get(metricname).getValue(), metadata.getTotalRollbackLogBlocks()); }); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index 1ed8ff241dc..795e6cfe7a6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -398,6 +398,26 @@ public class HoodieCommitMetadata implements Serializable { return totalUpdateRecords; } + public Long getTotalCorruptLogBlocks() { + Long totalCorruptedLogBlocks = 0L; + for (Map.Entry<String, List<HoodieWriteStat>> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + totalCorruptedLogBlocks += writeStat.getTotalCorruptLogBlock(); + } + } + return totalCorruptedLogBlocks; + } + + public Long getTotalRollbackLogBlocks() { + Long totalRollbackLogBlocks = 0L; + for (Map.Entry<String, List<HoodieWriteStat>> entry : partitionToWriteStats.entrySet()) { + for (HoodieWriteStat writeStat : entry.getValue()) { + totalRollbackLogBlocks += writeStat.getTotalRollbackBlocks(); + } + } + return totalRollbackLogBlocks; + } + public Long getTotalLogFilesSize() { Long totalLogFilesSize = 0L; for (Map.Entry<String, List<HoodieWriteStat>> entry : partitionToWriteStats.entrySet()) {