This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit db2129ebb625637038ba6dea3834b0c6d5bcf55a Author: Sagar Sumit <sagarsumi...@gmail.com> AuthorDate: Thu Aug 31 03:04:01 2023 +0530 [HUDI-3727] Add metrics for async indexer (#9559) --- .../apache/hudi/metadata/HoodieMetadataWriteUtils.java | 1 - .../hudi/table/action/index/RunIndexActionExecutor.java | 16 +++++++++++++++- .../org/apache/hudi/metadata/HoodieMetadataMetrics.java | 3 ++- 3 files changed, 17 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index 2078896987d..e73f6fb7bc3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -68,7 +68,6 @@ public class HoodieMetadataWriteUtils { // eventually depend on the number of file groups selected for each partition (See estimateFileGroupCount function) private static final long MDT_MAX_HFILE_SIZE_BYTES = 10 * 1024 * 1024 * 1024L; // 10GB - /** * Create a {@code HoodieWriteConfig} to use for the Metadata Table. This is used by async * indexer only. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java index 9b91167899c..461c525a1d5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; import org.apache.hudi.client.transaction.TransactionManager; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -35,11 +36,13 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.util.CleanerUtils; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.HoodieTimer; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieMetadataException; +import org.apache.hudi.metadata.HoodieMetadataMetrics; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.table.HoodieTable; @@ -90,6 +93,8 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, private static final int MAX_CONCURRENT_INDEXING = 1; private static final int TIMELINE_RELOAD_INTERVAL_MILLIS = 5000; + private final Option<HoodieMetadataMetrics> metrics; + // we use this to update the latest instant in data timeline that has been indexed in metadata table // this needs to be volatile as it can be updated in the IndexingCheckTask spawned by this executor // assumption is that only one indexer can execute at a time @@ -100,6 +105,11 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, HoodieTable<T, I, K, O> table, String instantTime) { super(context, config, table, instantTime); this.txnManager = new TransactionManager(config, table.getMetaClient().getFs()); + if (config.getMetadataConfig().enableMetrics()) { + this.metrics = Option.of(new HoodieMetadataMetrics(Registry.getRegistry("HoodieIndexer"))); + } else { + this.metrics = Option.empty(); + } } @Override @@ -143,7 +153,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, // this will only build index upto base instant as generated by the plan, we will be doing catchup later String indexUptoInstant = indexPartitionInfos.get(0).getIndexUptoInstant(); LOG.info("Starting Index Building with base instant: " + indexUptoInstant); + HoodieTimer timer = HoodieTimer.start(); metadataWriter.buildMetadataPartitions(context, indexPartitionInfos); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer())); // get remaining instants to catchup List<HoodieInstant> instantsToCatchup = getInstantsToCatchup(indexUptoInstant); @@ -167,7 +179,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, .collect(Collectors.toList()); } catch (Exception e) { throw new HoodieMetadataException("Failed to index partition " + Arrays.toString(indexPartitionInfos.stream() - .map(entry -> entry.getMetadataPartitionPath()).collect(Collectors.toList()).toArray())); + .map(entry -> entry.getMetadataPartitionPath()).collect(Collectors.toList()).toArray())); } } else { String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant(); @@ -275,7 +287,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends BaseActionExecutor<T, I, new IndexingCatchupTask(metadataWriter, instantsToIndex, metadataCompletedTimestamps, table.getMetaClient(), metadataMetaClient)); try { LOG.info("Starting index catchup task"); + HoodieTimer timer = HoodieTimer.start(); indexingCatchupTaskFuture.get(config.getIndexingCheckTimeoutSeconds(), TimeUnit.SECONDS); + metrics.ifPresent(m -> m.updateMetrics(HoodieMetadataMetrics.ASYNC_INDEXER_CATCHUP_TIME, timer.endTimer())); } catch (Exception e) { indexingCatchupTaskFuture.cancel(true); throw new HoodieIndexException(String.format("Index catchup failed. Current indexed instant = %s. Aborting!", currentCaughtupInstant), e); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java index 521b55efaed..ca9bf7b0834 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java @@ -69,6 +69,7 @@ public class HoodieMetadataMetrics implements Serializable { public static final String SKIP_TABLE_SERVICES = "skip_table_services"; public static final String TABLE_SERVICE_EXECUTION_STATUS = "table_service_execution_status"; public static final String TABLE_SERVICE_EXECUTION_DURATION = "table_service_execution_duration"; + public static final String ASYNC_INDEXER_CATCHUP_TIME = "async_indexer_catchup_time"; private static final Logger LOG = LoggerFactory.getLogger(HoodieMetadataMetrics.class); @@ -126,7 +127,7 @@ public class HoodieMetadataMetrics implements Serializable { return stats; } - protected void updateMetrics(String action, long durationInMs) { + public void updateMetrics(String action, long durationInMs) { if (metricsRegistry == null) { return; }