This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch perf-bucket-index-key-fields-parse-once
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 29a0eadb19c7ece4e3ef22335f0cab05cb57fff9
Author: voon <[email protected]>
AuthorDate: Thu Jun 11 17:30:01 2026 +0800

    perf(spark): Parse bucket index hash-field config once instead of per record
    
    The String overload of BucketIdentifier.getBucketId re-parses the
    comma-separated hash-field config on every call (split, per-token trim,
    empty filter, new list). Precompute the field list once per partitioner
    or writer with KeyGenUtils.getIndexKeyFields, the exact parser the
    String overload uses, and call the existing List overloads: bucket ids
    are bit-identical since the downstream chain is unchanged, mirroring
    the precedent in HoodieBucketIndex.
    
    Covers the upsert shuffle (SparkBucketIndexPartitioner), the row-writer
    repartition closure (BucketPartitionUtils) and the bucket bulk-insert
    write path (BucketBulkInsertDataInternalWriterHelper and the
    consistent-hashing variant).
    
    Measured on the two existing overloads (JDK 17, single-field key, 10M
    iterations): ~62 ns/op with the per-call parse vs ~4 ns/op with the
    precomputed list.
---
 .../commit/BucketBulkInsertDataInternalWriterHelper.java      | 11 ++++++++---
 .../ConsistentBucketBulkInsertDataInternalWriterHelper.java   |  2 +-
 .../hudi/table/action/commit/SparkBucketIndexPartitioner.java |  9 ++++++---
 .../scala/org/apache/spark/sql/BucketPartitionUtils.scala     |  6 +++++-
 4 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
index 15d973743fd1..6a3c5dd4912c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -25,6 +25,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
 import org.apache.hudi.table.HoodieTable;
 
@@ -35,6 +36,7 @@ import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -47,7 +49,9 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
   private Pair<UTF8String, Integer> lastFileId; // for efficient code path
   // p -> (fileId -> handle)
   private final Map<Pair<UTF8String, Integer>, HoodieRowCreateHandle> handles;
-  protected final String indexKeyFields;
+  // parsed once; the per-row write path uses the List overloads so the 
comma-separated config
+  // string is not re-split per row
+  protected final List<String> indexKeyFieldList;
   protected final int bucketNum;
   private final boolean isNonBlockingConcurrencyControl;
   private final NumBucketsFunction numBucketsFunction;
@@ -62,7 +66,8 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
                                                   String instantTime, int 
taskPartitionId, long taskId, long taskEpochId, StructType structType,
                                                   boolean populateMetaFields, 
boolean arePartitionRecordsSorted, boolean shouldPreserveHoodieMetadata) {
     super(hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, 
taskEpochId, structType, populateMetaFields, arePartitionRecordsSorted, 
shouldPreserveHoodieMetadata);
-    this.indexKeyFields = 
writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, 
writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));
+    this.indexKeyFieldList = KeyGenUtils.getIndexKeyFields(
+        
writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, 
writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key())));
     this.bucketNum = 
writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
     this.handles = new HashMap<>();
     this.isNonBlockingConcurrencyControl = 
writeConfig.isNonBlockingConcurrencyControl();
@@ -73,7 +78,7 @@ public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInte
     try {
       UTF8String partitionPath = extractPartitionPath(row);
       UTF8String recordKey = extractRecordKey(row);
-      int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), 
indexKeyFields, numBucketsFunction.getNumBuckets(partitionPath.toString()));
+      int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), 
indexKeyFieldList, numBucketsFunction.getNumBuckets(partitionPath.toString()));
       if (lastFileId == null || !Objects.equals(lastFileId.getKey(), 
partitionPath) || !Objects.equals(lastFileId.getValue(), bucketId)) {
         // NOTE: It's crucial to make a copy here, since [[UTF8String]] could 
be pointing into
         //       a mutable underlying buffer
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/ConsistentBucketBulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/ConsistentBucketBulkInsertDataInternalWriterHelper.java
index 9072e32939d7..19c11caa1369 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/ConsistentBucketBulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/ConsistentBucketBulkInsertDataInternalWriterHelper.java
@@ -75,7 +75,7 @@ public class 
ConsistentBucketBulkInsertDataInternalWriterHelper extends BucketBu
 
   private HoodieRowCreateHandle getBucketRowCreateHandle(String partitionPath, 
String recordKey) {
     ConsistentBucketIdentifier identifier = getBucketIdentifier(partitionPath);
-    final ConsistentHashingNode node = identifier.getBucket(recordKey, 
indexKeyFields);
+    final ConsistentHashingNode node = identifier.getBucket(recordKey, 
indexKeyFieldList);
     String fileId = FSUtils.createNewFileId(node.getFileIdPrefix(), 0);
 
     ValidationUtils.checkArgument(node.getTag() != 
ConsistentHashingNode.NodeTag.NORMAL
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
index a0f778f17e6f..db7261eb8286 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/SparkBucketIndexPartitioner.java
@@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.index.bucket.HoodieBucketIndex;
+import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
@@ -52,7 +53,9 @@ public class SparkBucketIndexPartitioner<T> extends
     SparkHoodiePartitioner<T> {
 
   private final int numBuckets;
-  private final String indexKeyField;
+  // parsed once; the per-record getPartition path uses the List overload of 
getBucketId so the
+  // comma-separated config string is not re-split per record
+  private final List<String> indexKeyFieldList;
   private final int totalPartitionPaths;
   private final List<String> partitionPaths;
   /**
@@ -80,7 +83,7 @@ public class SparkBucketIndexPartitioner<T> extends
               + table.getIndex().getClass().getSimpleName());
     }
     this.numBuckets = ((HoodieBucketIndex) table.getIndex()).getNumBuckets();
-    this.indexKeyField = config.getBucketIndexHashField();
+    this.indexKeyFieldList = 
KeyGenUtils.getIndexKeyFields(config.getBucketIndexHashField());
     this.totalPartitionPaths = profile.getPartitionPaths().size();
     partitionPaths = new ArrayList<>(profile.getPartitionPaths());
     partitionPathOffset = new HashMap<>();
@@ -129,7 +132,7 @@ public class SparkBucketIndexPartitioner<T> extends
     Option<HoodieRecordLocation> location = keyLocation._2;
     int bucketId = location.isPresent()
         ? BucketIdentifier.bucketIdFromFileId(location.get().getFileId())
-        : BucketIdentifier.getBucketId(keyLocation._1.getRecordKey(), 
indexKeyField, numBuckets);
+        : BucketIdentifier.getBucketId(keyLocation._1.getRecordKey(), 
indexKeyFieldList, numBuckets);
     return partitionPathOffset.get(partitionPath) + bucketId;
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
index da7e8c682e4e..14aff571c238 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/BucketPartitionUtils.scala
@@ -25,16 +25,20 @@ import org.apache.hudi.common.util.{Functions, 
RemotePartitionHelper}
 import org.apache.hudi.common.util.hash.BucketIndexUtil
 import org.apache.hudi.index.bucket.BucketIdentifier
 import org.apache.hudi.index.bucket.partition.NumBucketsFunction
+import org.apache.hudi.keygen.KeyGenUtils
 
 import org.apache.spark.Partitioner
 import org.apache.spark.sql.catalyst.InternalRow
 
 object BucketPartitionUtils extends SparkAdapterSupport {
   def createDataFrame(df: DataFrame, indexKeyFields: String, 
numBucketsFunction: NumBucketsFunction, partitioner: Partitioner): DataFrame = {
+    // parse the comma-separated config once outside the per-row closure; the 
list is a
+    // serializable java.util.List, safe to capture
+    val indexKeyFieldList = KeyGenUtils.getIndexKeyFields(indexKeyFields)
     def getPartitionKeyExtractor(): InternalRow => (String, Int) = row => {
       val partition = row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD)
       val kb = BucketIdentifier
-        .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD), 
indexKeyFields, numBucketsFunction.getNumBuckets(partition))
+        .getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD), 
indexKeyFieldList, numBucketsFunction.getNumBuckets(partition))
 
       if (partition == null || partition.trim.isEmpty) {
         ("", kb)

Reply via email to