This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3525983c90e [HUDI-7624] Fixing source read and index tagging duration 
(#12789)
3525983c90e is described below

commit 3525983c90eb2fd02682596ada304ba11897290e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Feb 18 21:49:28 2025 -0800

    [HUDI-7624] Fixing source read and index tagging duration (#12789)
    
    
    ---------
    
    Co-authored-by: Rajesh Mahindra 
<[email protected]>
---
 .../action/commit/BaseCommitActionExecutor.java    |  5 +++++
 .../hudi/table/action/commit/BaseWriteHelper.java  |  5 ++++-
 .../table/action/commit/HoodieDeleteHelper.java    |  5 ++++-
 .../org/apache/hudi/metrics/TestHoodieMetrics.java |  9 +++++++++
 .../commit/BaseFlinkCommitActionExecutor.java      |  6 ++++++
 .../commit/BaseJavaCommitActionExecutor.java       |  7 ++++++-
 .../commit/BaseSparkCommitActionExecutor.java      | 22 +++++++++++++++-------
 7 files changed, 49 insertions(+), 10 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index eff9dd8bf65..255dfab4460 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.table.timeline.InstantGenerator;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.StringUtils;
@@ -115,6 +116,10 @@ public abstract class BaseCommitActionExecutor<T, I, K, O, 
R>
 
   public abstract HoodieWriteMetadata<O> execute(I inputRecords);
 
+  public HoodieWriteMetadata<O> execute(I inputRecords, Option<HoodieTimer> 
sourceReadAndIndexTimer) {
+    return this.execute(inputRecords);
+  }
+
   /**
    * Save the workload profile in an intermediate file (here re-using commit 
files) This is useful when performing
    * rollback for MOR tables. Only updates are recorded in the workload 
profile metadata since updates to log blocks
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index ff47b636098..5d6bb6048e5 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -24,6 +24,8 @@ import 
org.apache.hudi.common.function.SerializableFunctionUnchecked;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieUpsertException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
@@ -44,6 +46,7 @@ public abstract class BaseWriteHelper<T, I, K, O, R> extends 
ParallelismHelper<I
                                       BaseCommitActionExecutor<T, I, K, O, R> 
executor,
                                       WriteOperationType operationType) {
     try {
+      HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start();
       // De-dupe/merge if needed
       I dedupedRecords =
           combineOnCondition(shouldCombine, inputRecords, 
configuredShuffleParallelism, table);
@@ -55,7 +58,7 @@ public abstract class BaseWriteHelper<T, I, K, O, R> extends 
ParallelismHelper<I
         taggedRecords = tag(dedupedRecords, context, table);
       }
 
-      HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
+      HoodieWriteMetadata<O> result = executor.execute(taggedRecords, 
Option.of(sourceReadAndIndexTimer));
       return result;
     } catch (Throwable e) {
       if (e instanceof HoodieUpsertException) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
index 17dd4282e14..413e77f87da 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.model.HoodieEmptyRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieUpsertException;
@@ -79,6 +81,7 @@ public class HoodieDeleteHelper<T, R> extends
                                                               HoodieTable<T, 
HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>> 
table,
                                                               
BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>, R> deleteExecutor) {
     try {
+      HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start();
       int targetParallelism =
           deduceShuffleParallelism((HoodieData) keys, 
config.getDeleteShuffleParallelism());
 
@@ -100,7 +103,7 @@ public class HoodieDeleteHelper<T, R> extends
       HoodieData<HoodieRecord<T>> taggedValidRecords = 
taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
       HoodieWriteMetadata<HoodieData<WriteStatus>> result;
       if (!taggedValidRecords.isEmpty()) {
-        result = deleteExecutor.execute(taggedValidRecords);
+        result = deleteExecutor.execute(taggedValidRecords, 
Option.of(sourceReadAndIndexTimer));
         result.setIndexLookupDuration(tagLocationDuration);
       } else {
         // if entire set of keys are non existent
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index ee5dba93b00..7da3c9479b7 100755
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -44,6 +44,7 @@ import java.util.UUID;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static 
org.apache.hudi.metrics.HoodieMetrics.SOURCE_READ_AND_INDEX_ACTION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -127,6 +128,14 @@ public class TestHoodieMetrics {
       }
     }
 
+    // PreWrite metrics
+    timer = hoodieMetrics.getSourceReadAndIndexTimerCtx();
+    Thread.sleep(5); // Ensure timer duration is > 0
+    hoodieMetrics.updateSourceReadAndIndexMetrics("some_action", 
hoodieMetrics.getDurationInMs(timer.stop()));
+    metricName = hoodieMetrics.getMetricsName(SOURCE_READ_AND_INDEX_ACTION, 
"some_action.duration");
+    msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
+    assertTrue(msec > 0);
+
     // Rollback metrics
     timer = hoodieMetrics.getRollbackCtx();
     Thread.sleep(5); // Ensure timer duration is > 0
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index e18052f002a..e285657dead 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -90,6 +91,11 @@ public abstract class BaseFlinkCommitActionExecutor<T> 
extends
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords) {
+    return execute(inputRecords, Option.empty());
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords, Option<HoodieTimer> sourceReadAndIndexTimer) {
     HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
 
     List<WriteStatus> writeStatuses = new LinkedList<>();
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index 2a9c2b86024..3d503239c33 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -86,12 +87,16 @@ public abstract class BaseJavaCommitActionExecutor<T> 
extends
 
   @Override
   public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords) {
+    return execute(inputRecords, Option.empty());
+  }
+
+  @Override
+  public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> 
inputRecords, Option<HoodieTimer> sourceReadAndIndexTimer) {
     HoodieWriteMetadata<List<WriteStatus>> result = new 
HoodieWriteMetadata<>();
 
     WorkloadProfile workloadProfile =
         new WorkloadProfile(buildProfile(inputRecords), 
table.getIndex().canIndexLogFiles());
     LOG.info("Input workload profile :" + workloadProfile);
-
     final Partitioner partitioner = getPartitioner(workloadProfile);
     try {
       saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 98f1469e5ee..183d97f8ce7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -148,13 +148,18 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
 
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> 
execute(HoodieData<HoodieRecord<T>> inputRecords) {
+    return this.execute(inputRecords, Option.empty());
+  }
+
+  @Override
+  public HoodieWriteMetadata<HoodieData<WriteStatus>> 
execute(HoodieData<HoodieRecord<T>> inputRecords, Option<HoodieTimer> 
sourceReadAndIndexTimer) {
     // Cache the tagged records, so we don't end up computing both
     JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
     if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
       HoodieJavaRDD.of(inputRDD).persist(config.getTaggedRecordStorageLevel(),
           context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
     } else {
-      LOG.info("RDD PreppedRecords was persisted at: " + 
inputRDD.getStorageLevel());
+      LOG.info("RDD PreppedRecords was persisted at: {}", 
inputRDD.getStorageLevel());
     }
 
     // Handle records update with clustering
@@ -162,13 +167,14 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
     LOG.info("Num spark partitions for inputRecords before triggering workload 
profile {}", inputRecordsWithClusteringUpdate.getNumPartitions());
 
     context.setJobStatus(this.getClass().getSimpleName(), "Building workload 
profile:" + config.getTableName());
-    HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start(); // time taken 
from dedup -> tag location -> building workload profile
     WorkloadProfile workloadProfile =
         new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate), 
operationType, table.getIndex().canIndexLogFiles());
-    LOG.debug("Input workload profile :" + workloadProfile);
-    long sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.endTimer();
-    LOG.info("Source read and index timer " + sourceReadAndIndexDurationMs);
-
+    LOG.debug("Input workload profile :{}", workloadProfile);
+    Long sourceReadAndIndexDurationMs = null;
+    if (sourceReadAndIndexTimer.isPresent()) {
+      sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.get().endTimer();
+      LOG.info("Source read and index timer {}", sourceReadAndIndexDurationMs);
+    }
     // partition using the insert partitioner
     final Partitioner partitioner = getPartitioner(workloadProfile);
     saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
@@ -177,7 +183,9 @@ public abstract class BaseSparkCommitActionExecutor<T> 
extends
     HoodieData<WriteStatus> writeStatuses = 
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = new 
HoodieWriteMetadata<>();
     updateIndexAndCommitIfNeeded(writeStatuses, result);
-    result.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs);
+    if (sourceReadAndIndexTimer.isPresent()) {
+      result.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs);
+    }
     return result;
   }
 

Reply via email to