This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit db2129ebb625637038ba6dea3834b0c6d5bcf55a
Author: Sagar Sumit <sagarsumi...@gmail.com>
AuthorDate: Thu Aug 31 03:04:01 2023 +0530

    [HUDI-3727] Add metrics for async indexer (#9559)
---
 .../apache/hudi/metadata/HoodieMetadataWriteUtils.java   |  1 -
 .../hudi/table/action/index/RunIndexActionExecutor.java  | 16 +++++++++++++++-
 .../org/apache/hudi/metadata/HoodieMetadataMetrics.java  |  3 ++-
 3 files changed, 17 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 2078896987d..e73f6fb7bc3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -68,7 +68,6 @@ public class HoodieMetadataWriteUtils {
   // eventually depend on the number of file groups selected for each 
partition (See estimateFileGroupCount function)
   private static final long MDT_MAX_HFILE_SIZE_BYTES = 10 * 1024 * 1024 * 
1024L; // 10GB
 
-
   /**
    * Create a {@code HoodieWriteConfig} to use for the Metadata Table.  This 
is used by async
    * indexer only.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 9b91167899c..461c525a1d5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -35,11 +36,13 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.exception.HoodieMetadataException;
+import org.apache.hudi.metadata.HoodieMetadataMetrics;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.table.HoodieTable;
@@ -90,6 +93,8 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
   private static final int MAX_CONCURRENT_INDEXING = 1;
   private static final int TIMELINE_RELOAD_INTERVAL_MILLIS = 5000;
 
+  private final Option<HoodieMetadataMetrics> metrics;
+
   // we use this to update the latest instant in data timeline that has been 
indexed in metadata table
   // this needs to be volatile as it can be updated in the IndexingCheckTask 
spawned by this executor
   // assumption is that only one indexer can execute at a time
@@ -100,6 +105,11 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
   public RunIndexActionExecutor(HoodieEngineContext context, HoodieWriteConfig 
config, HoodieTable<T, I, K, O> table, String instantTime) {
     super(context, config, table, instantTime);
     this.txnManager = new TransactionManager(config, 
table.getMetaClient().getFs());
+    if (config.getMetadataConfig().enableMetrics()) {
+      this.metrics = Option.of(new 
HoodieMetadataMetrics(Registry.getRegistry("HoodieIndexer")));
+    } else {
+      this.metrics = Option.empty();
+    }
   }
 
   @Override
@@ -143,7 +153,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
           // this will only build index upto base instant as generated by the 
plan, we will be doing catchup later
           String indexUptoInstant = 
indexPartitionInfos.get(0).getIndexUptoInstant();
           LOG.info("Starting Index Building with base instant: " + 
indexUptoInstant);
+          HoodieTimer timer = HoodieTimer.start();
           metadataWriter.buildMetadataPartitions(context, indexPartitionInfos);
+          metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
 
           // get remaining instants to catchup
           List<HoodieInstant> instantsToCatchup = 
getInstantsToCatchup(indexUptoInstant);
@@ -167,7 +179,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
               .collect(Collectors.toList());
         } catch (Exception e) {
           throw new HoodieMetadataException("Failed to index partition " + 
Arrays.toString(indexPartitionInfos.stream()
-                .map(entry -> 
entry.getMetadataPartitionPath()).collect(Collectors.toList()).toArray()));
+              .map(entry -> 
entry.getMetadataPartitionPath()).collect(Collectors.toList()).toArray()));
         }
       } else {
         String indexUptoInstant = fileIndexPartitionInfo.getIndexUptoInstant();
@@ -275,7 +287,9 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
         new IndexingCatchupTask(metadataWriter, instantsToIndex, 
metadataCompletedTimestamps, table.getMetaClient(), metadataMetaClient));
     try {
       LOG.info("Starting index catchup task");
+      HoodieTimer timer = HoodieTimer.start();
       indexingCatchupTaskFuture.get(config.getIndexingCheckTimeoutSeconds(), 
TimeUnit.SECONDS);
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.ASYNC_INDEXER_CATCHUP_TIME, 
timer.endTimer()));
     } catch (Exception e) {
       indexingCatchupTaskFuture.cancel(true);
       throw new HoodieIndexException(String.format("Index catchup failed. 
Current indexed instant = %s. Aborting!", currentCaughtupInstant), e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
index 521b55efaed..ca9bf7b0834 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
@@ -69,6 +69,7 @@ public class HoodieMetadataMetrics implements Serializable {
   public static final String SKIP_TABLE_SERVICES = "skip_table_services";
   public static final String TABLE_SERVICE_EXECUTION_STATUS = 
"table_service_execution_status";
   public static final String TABLE_SERVICE_EXECUTION_DURATION = 
"table_service_execution_duration";
+  public static final String ASYNC_INDEXER_CATCHUP_TIME = 
"async_indexer_catchup_time";
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HoodieMetadataMetrics.class);
 
@@ -126,7 +127,7 @@ public class HoodieMetadataMetrics implements Serializable {
     return stats;
   }
 
-  protected void updateMetrics(String action, long durationInMs) {
+  public void updateMetrics(String action, long durationInMs) {
     if (metricsRegistry == null) {
       return;
     }

Reply via email to