This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 059649c51fbb perf(flink): Parse bucket index hash-field config once 
instead of per ... (#18993)
059649c51fbb is described below

commit 059649c51fbbfeca809825548efa486d5ef80bdb
Author: voonhous <[email protected]>
AuthorDate: Wed Jun 17 11:21:45 2026 +0800

    perf(flink): Parse bucket index hash-field config once instead of per ... 
(#18993)
    
    * perf(flink): Parse bucket index hash-field config once instead of per 
record
    
    Flink mirror of #18979 (issue #18978). The bucket-index write paths re-split
    the comma-separated hoodie.bucket.index.hash.field config per record via the
    String overload of BucketIdentifier.getBucketId. Precompute
    KeyGenUtils.getIndexKeyFields(...) once and call the List overload:
    
    - BucketIndexPartitioner / BucketIndexRemotePartitioner: parse once in the
      constructor (factory and constructor signatures stay String-based)
    - BucketStreamWriteFunction: parse once in open()
    - BucketBulkInsertWriterHelper.rowWithFileId/getFileId: take List<String>;
      Pipelines parses once before the per-record map stage
    
    Behavior-preserving (same bucket ids).
    
    * refactor(flink): Add OptionsResolver.getIndexKeyFields list accessor
    
    Add OptionsResolver.getIndexKeyFields(Configuration) returning the parsed
    List<String> and use it in BucketStreamWriteFunction.open() instead of
    KeyGenUtils.getIndexKeyFields(OptionsResolver.getIndexKeyField(config)).
    
    * perf(flink): Build NumBucketsFunction once for bulk insert instead of per 
record
    
    BucketBulkInsertWriterHelper.getFileId/rowWithFileId rebuilt 
NumBucketsFunction
    from three conf.get(...) lookups on every record in the bulk-insert map, 
and its
    constructor logs per record. Construct it once in Pipelines.bulkInsert (and 
the
    test wrapper) and pass it through; the map closure captures it since it is
    Serializable, mirroring the sibling bucket-index partitioners. 
Behavior-preserving.
    
    * style(flink): Wrap long getFileId/rowWithFileId signatures under 200 chars
    
    Fix checkstyle LineLength violations introduced when the NumBucketsFunction
    parameter was added; wrap both signatures with paren-aligned continuations.
    
    * review(flink): Use OptionsResolver.getIndexKeyFields in 
Pipelines.bulkInsert
    
    Use the OptionsResolver.getIndexKeyFields(conf) accessor for the parsed 
list in
    Pipelines.bulkInsert and drop the now-unused KeyGenUtils import; the String 
form
    is still kept for BucketIndexPartitionerFactory.create.
    
    * review(flink): Use OptionsResolver.getIndexKeyFields in 
BulkInsertFunctionWrapper
    
    Mirror the Pipelines change in the test wrapper: use 
OptionsResolver.getIndexKeyFields(conf)
    for the parsed list and drop the now-unused indexKeys local and KeyGenUtils 
import.
    
    * review(flink): pass parsed index key fields list to 
BucketIndexPartitionerFactory.create
---
 .../org/apache/hudi/configuration/OptionsResolver.java   |  7 +++++++
 .../hudi/sink/bucket/BucketBulkInsertWriterHelper.java   | 15 +++++++--------
 .../hudi/sink/bucket/BucketStreamWriteFunction.java      |  9 ++++++---
 .../hudi/sink/partitioner/BucketIndexPartitioner.java    | 12 ++++++++----
 .../sink/partitioner/BucketIndexPartitionerFactory.java  | 10 ++++++----
 .../sink/partitioner/BucketIndexRemotePartitioner.java   | 16 ++++++++++------
 .../main/java/org/apache/hudi/sink/utils/Pipelines.java  | 12 +++++++++---
 .../partitioner/TestBucketIndexPartitionerFactory.java   |  6 ++++--
 .../partitioner/TestBucketIndexRemotePartitioner.java    |  4 +++-
 .../hudi/sink/utils/BulkInsertFunctionWrapper.java       |  7 +++++--
 10 files changed, 65 insertions(+), 33 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 5089b4eea3c7..8853d5844984 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -560,6 +560,13 @@ public class OptionsResolver {
     return conf.getString(FlinkOptions.INDEX_KEY_FIELD.key(), 
getRecordKeyStr(conf));
   }
 
+  /**
+   * Returns the index key fields as a list, parsing the comma-separated 
config value once.
+   */
+  public static List<String> getIndexKeyFields(Configuration conf) {
+    return KeyGenUtils.getIndexKeyFields(getIndexKeyField(conf));
+  }
+
   /**
    * Returns the conflict resolution strategy.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
index fad1f7e9272b..5c324235251c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketBulkInsertWriterHelper.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.sink.bucket;
 
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.bucket.BucketIdentifier;
 import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.hudi.io.storage.row.HoodieRowDataCreateHandle;
@@ -38,6 +37,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -93,20 +93,19 @@ public class BucketBulkInsertWriterHelper extends 
BulkInsertWriterHelper {
     return new SortOperatorGen(rowType, new String[] {FILE_GROUP_META_FIELD});
   }
 
-  private static String getFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, Configuration conf, 
boolean needFixedFileIdSuffix) {
+  private static String getFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, List<String> indexKeyFields,
+                                  NumBucketsFunction numBucketsFunction, 
boolean needFixedFileIdSuffix) {
     String recordKey = keyGen.getRecordKey(record);
     String partition = keyGen.getPartitionPath(record);
-    NumBucketsFunction numBucketsFunction = new 
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS), 
conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE),
-        conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
-
     final int numBuckets = numBucketsFunction.getNumBuckets(partition);
-    final int bucketNum = BucketIdentifier.getBucketId(recordKey, indexKeys, 
numBuckets);
+    final int bucketNum = BucketIdentifier.getBucketId(recordKey, 
indexKeyFields, numBuckets);
     String bucketId = partition + bucketNum;
     return bucketIdToFileId.computeIfAbsent(bucketId, k -> 
needFixedFileIdSuffix ? BucketIdentifier.newBucketFileIdForNBCC(bucketNum) : 
BucketIdentifier.newBucketFileIdPrefix(bucketNum));
   }
 
-  public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, String indexKeys, Configuration conf, 
boolean needFixedFileIdSuffix) {
-    final String fileId = getFileId(bucketIdToFileId, keyGen, record, 
indexKeys, conf, needFixedFileIdSuffix);
+  public static RowData rowWithFileId(Map<String, String> bucketIdToFileId, 
RowDataKeyGen keyGen, RowData record, List<String> indexKeyFields,
+                                      NumBucketsFunction numBucketsFunction, 
boolean needFixedFileIdSuffix) {
+    final String fileId = getFileId(bucketIdToFileId, keyGen, record, 
indexKeyFields, numBucketsFunction, needFixedFileIdSuffix);
     return GenericRowData.of(StringData.fromString(fileId), record);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 949753ef8094..280c25fb9947 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -41,6 +41,7 @@ import org.apache.flink.util.Collector;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -56,7 +57,9 @@ public class BucketStreamWriteFunction extends 
StreamWriteFunction {
 
   private int parallelism;
 
-  private String indexKeyFields;
+  // parsed once in open(); the per-record defineRecordLocation path uses the 
List overload of
+  // getBucketId so the comma-separated config string is not re-split per 
record
+  private List<String> indexKeyFieldList;
 
   private boolean isNonBlockingConcurrencyControl;
 
@@ -102,7 +105,7 @@ public class BucketStreamWriteFunction extends 
StreamWriteFunction {
   @Override
   public void open(Configuration parameters) throws IOException {
     super.open(parameters);
-    this.indexKeyFields = OptionsResolver.getIndexKeyField(config);
+    this.indexKeyFieldList = OptionsResolver.getIndexKeyFields(config);
     this.isNonBlockingConcurrencyControl = 
OptionsResolver.isNonBlockingConcurrencyControl(config);
     this.taskID = 
RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
     this.parallelism = 
RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
@@ -144,7 +147,7 @@ public class BucketStreamWriteFunction extends 
StreamWriteFunction {
       bootstrapIndexIfNeed(partition);
     }
     Map<Integer, String> bucketToFileId = 
bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
-    final int bucketNum = BucketIdentifier.getBucketId(record.getRecordKey(), 
indexKeyFields, numBucketsFunction.getNumBuckets(record.getPartitionPath()));
+    final int bucketNum = BucketIdentifier.getBucketId(record.getRecordKey(), 
indexKeyFieldList, numBucketsFunction.getNumBuckets(record.getPartitionPath()));
     final String bucketId = partition + "/" + bucketNum;
 
     if (incBucketIndex.contains(bucketId)) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
index fb5b3fdb6f0f..d63ebbbed86a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitioner.java
@@ -28,6 +28,8 @@ import 
org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.List;
+
 /**
  * Bucket index input partitioner.
  * The fields to hash can be a subset of the primary key fields.
@@ -36,13 +38,15 @@ import org.apache.flink.configuration.Configuration;
  */
 public class BucketIndexPartitioner<T extends HoodieKey> implements 
Partitioner<T> {
 
-  private final String indexKeyFields;
+  // Index key fields, pre-parsed by the caller. The per-record partition() 
path uses the List
+  // overload of getBucketId so the comma-separated config string is never 
re-split per record.
+  private final List<String> indexKeyFieldList;
   private final NumBucketsFunction numBucketsFunction;
 
   private Functions.Function3<Integer, String, Integer, Integer> 
partitionIndexFunc;
 
-  public BucketIndexPartitioner(Configuration conf, String indexKeyFields) {
-    this.indexKeyFields = indexKeyFields;
+  public BucketIndexPartitioner(Configuration conf, List<String> 
indexKeyFieldList) {
+    this.indexKeyFieldList = indexKeyFieldList;
     this.numBucketsFunction = new 
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
         conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), 
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
   }
@@ -53,7 +57,7 @@ public class BucketIndexPartitioner<T extends HoodieKey> 
implements Partitioner<
       this.partitionIndexFunc = 
BucketIndexUtil.getPartitionIndexFunc(numPartitions);
     }
     int numBuckets = numBucketsFunction.getNumBuckets(key.getPartitionPath());
-    int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), 
indexKeyFields, numBuckets);
+    int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), 
indexKeyFieldList, numBuckets);
     return this.partitionIndexFunc.apply(numBuckets, key.getPartitionPath(), 
curBucket);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitionerFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitionerFactory.java
index eb194643ae1d..3943e8455a10 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitionerFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexPartitionerFactory.java
@@ -24,6 +24,8 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.List;
+
 /**
  * Factory for simple bucket index partitioners.
  */
@@ -33,12 +35,12 @@ public class BucketIndexPartitionerFactory {
   }
 
   public static Partitioner<HoodieKey> create(Configuration conf) {
-    return create(conf, OptionsResolver.getIndexKeyField(conf));
+    return create(conf, OptionsResolver.getIndexKeyFields(conf));
   }
 
-  public static Partitioner<HoodieKey> create(Configuration conf, String 
indexKeyFields) {
+  public static Partitioner<HoodieKey> create(Configuration conf, List<String> 
indexKeyFieldList) {
     return OptionsResolver.shouldUseBucketRemotePartitioner(conf)
-        ? new BucketIndexRemotePartitioner<>(conf, indexKeyFields)
-        : new BucketIndexPartitioner<>(conf, indexKeyFields);
+        ? new BucketIndexRemotePartitioner<>(conf, indexKeyFieldList)
+        : new BucketIndexPartitioner<>(conf, indexKeyFieldList);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java
index 0ef8378c3519..db70152a2a70 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketIndexRemotePartitioner.java
@@ -30,6 +30,8 @@ import org.apache.hudi.util.ViewStorageProperties;
 import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.List;
+
 /**
  * Bucket index input partitioner backed by the embedded timeline service.
  *
@@ -38,20 +40,22 @@ import org.apache.flink.configuration.Configuration;
 public class BucketIndexRemotePartitioner<T extends HoodieKey> implements 
Partitioner<T> {
 
   private final Configuration conf;
-  private final String indexKeyFields;
+  // Index key fields, pre-parsed by the caller. The per-record partition() 
path uses the List
+  // overload of getBucketId so the comma-separated config string is never 
re-split per record.
+  private final List<String> indexKeyFieldList;
   private final NumBucketsFunction numBucketsFunction;
 
   private transient RemotePartitionHelper remotePartitionHelper;
 
-  public BucketIndexRemotePartitioner(Configuration conf, String 
indexKeyFields) {
+  public BucketIndexRemotePartitioner(Configuration conf, List<String> 
indexKeyFieldList) {
     this.conf = conf;
-    this.indexKeyFields = indexKeyFields;
+    this.indexKeyFieldList = indexKeyFieldList;
     this.numBucketsFunction = new 
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
         conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), 
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
   }
 
-  BucketIndexRemotePartitioner(Configuration conf, String indexKeyFields, 
RemotePartitionHelper remotePartitionHelper) {
-    this(conf, indexKeyFields);
+  BucketIndexRemotePartitioner(Configuration conf, List<String> 
indexKeyFieldList, RemotePartitionHelper remotePartitionHelper) {
+    this(conf, indexKeyFieldList);
     this.remotePartitionHelper = remotePartitionHelper;
   }
 
@@ -59,7 +63,7 @@ public class BucketIndexRemotePartitioner<T extends 
HoodieKey> implements Partit
   public int partition(T key, int numPartitions) {
     String partitionPath = normalizePartitionPath(key.getPartitionPath());
     int numBuckets = numBucketsFunction.getNumBuckets(partitionPath);
-    int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), 
indexKeyFields, numBuckets);
+    int curBucket = BucketIdentifier.getBucketId(key.getRecordKey(), 
indexKeyFieldList, numBuckets);
     return doGetRemotePartition(getRemotePartitionHelper(), numBuckets, 
partitionPath, curBucket, numPartitions);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 8c0975c9943d..7caacf49cf64 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -28,6 +28,7 @@ import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.hudi.sink.CleanFunction;
 import org.apache.hudi.sink.StreamWriteOperator;
 import org.apache.hudi.sink.append.AppendWriteFunctions;
@@ -85,6 +86,7 @@ import org.apache.flink.table.types.logical.RowType;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Supplier;
@@ -139,8 +141,12 @@ public class Pipelines {
         throw new HoodieException(
             "Consistent hashing bucket index does not work with bulk insert 
using FLINK engine. Use simple bucket index or Spark engine.");
       }
-      String indexKeys = OptionsResolver.getIndexKeyField(conf);
-      Partitioner<HoodieKey> partitioner = 
BucketIndexPartitionerFactory.create(conf, indexKeys);
+      List<String> indexKeyFieldList = OptionsResolver.getIndexKeyFields(conf);
+      // built once and captured by the per-record map closure 
(NumBucketsFunction is Serializable),
+      // avoiding a per-record rebuild from conf inside 
BucketBulkInsertWriterHelper
+      NumBucketsFunction numBucketsFunction = new 
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+          conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), 
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
+      Partitioner<HoodieKey> partitioner = 
BucketIndexPartitionerFactory.create(conf, indexKeyFieldList);
       RowDataKeyGen keyGen = RowDataKeyGens.instance(conf, rowType);
       RowType rowTypeWithFileId = 
BucketBulkInsertWriterHelper.rowTypeWithFileId(rowType);
       InternalTypeInfo<RowData> typeInfo = 
InternalTypeInfo.of(rowTypeWithFileId);
@@ -148,7 +154,7 @@ public class Pipelines {
 
       Map<String, String> bucketIdToFileId = new HashMap<>();
       dataStream = dataStream.partitionCustom(partitioner, 
keyGen::getHoodieKey)
-          .map(record -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, 
indexKeys, conf, needFixedFileIdSuffix), typeInfo)
+          .map(record -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, record, 
indexKeyFieldList, numBucketsFunction, needFixedFileIdSuffix), typeInfo)
           .setParallelism(PARALLELISM_VALUE);
       if (conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
         SortOperatorGen sortOperatorGen = 
BucketBulkInsertWriterHelper.getFileIdSorterGen(rowTypeWithFileId);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexPartitionerFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexPartitionerFactory.java
index b07cc4f434ef..9aad1821a224 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexPartitionerFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexPartitionerFactory.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 
 /**
@@ -36,7 +38,7 @@ class TestBucketIndexPartitionerFactory {
 
   @Test
   void testCreateLocalBucketIndexPartitioner() {
-    Partitioner<HoodieKey> partitioner = 
BucketIndexPartitionerFactory.create(getSimpleBucketConf(), "id");
+    Partitioner<HoodieKey> partitioner = 
BucketIndexPartitionerFactory.create(getSimpleBucketConf(), 
Collections.singletonList("id"));
 
     assertInstanceOf(BucketIndexPartitioner.class, partitioner);
   }
@@ -46,7 +48,7 @@ class TestBucketIndexPartitionerFactory {
     Configuration conf = getSimpleBucketConf();
     conf.setString(HoodieIndexConfig.BUCKET_PARTITIONER.key(), "true");
 
-    Partitioner<HoodieKey> partitioner = 
BucketIndexPartitionerFactory.create(conf, "id");
+    Partitioner<HoodieKey> partitioner = 
BucketIndexPartitionerFactory.create(conf, Collections.singletonList("id"));
 
     assertInstanceOf(BucketIndexRemotePartitioner.class, partitioner);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexRemotePartitioner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexRemotePartitioner.java
index b693228a796b..b3dda319ea9e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexRemotePartitioner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketIndexRemotePartitioner.java
@@ -28,6 +28,8 @@ import 
org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.flink.configuration.Configuration;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
@@ -91,7 +93,7 @@ class TestBucketIndexRemotePartitioner {
     when(remotePartitionHelper.getPartition(8, "", currentBucket, 
16)).thenReturn(11);
 
     BucketIndexRemotePartitioner<HoodieKey> partitioner =
-        new BucketIndexRemotePartitioner<>(conf, "id", remotePartitionHelper);
+        new BucketIndexRemotePartitioner<>(conf, 
Collections.singletonList("id"), remotePartitionHelper);
 
     assertEquals(11, partitioner.partition(key, 16));
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
index 159ead016fb5..1d5cc2fc9a97 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/BulkInsertFunctionWrapper.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
 import org.apache.hudi.sink.StreamWriteOperatorCoordinator;
 import org.apache.hudi.sink.bucket.BucketBulkInsertWriterHelper;
 import org.apache.hudi.sink.bulk.BulkInsertWriteFunction;
@@ -213,10 +214,12 @@ public class BulkInsertFunctionWrapper<I> implements 
TestFunctionWrapper<I> {
 
   private void setupMapFunction() {
     RowDataKeyGen keyGen = RowDataKeyGens.instance(conf, rowType);
-    String indexKeys = OptionsResolver.getIndexKeyField(conf);
+    List<String> indexKeyFieldList = OptionsResolver.getIndexKeyFields(conf);
+    NumBucketsFunction numBucketsFunction = new 
NumBucketsFunction(conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_EXPRESSIONS),
+        conf.get(FlinkOptions.BUCKET_INDEX_PARTITION_RULE), 
conf.get(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS));
     boolean needFixedFileIdSuffix = 
OptionsResolver.isNonBlockingConcurrencyControl(conf);
     this.bucketIdToFileId = new HashMap<>();
-    this.mapFunction = r -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, r, 
indexKeys, conf, needFixedFileIdSuffix);
+    this.mapFunction = r -> 
BucketBulkInsertWriterHelper.rowWithFileId(bucketIdToFileId, keyGen, r, 
indexKeyFieldList, numBucketsFunction, needFixedFileIdSuffix);
   }
 
   private void setupSortOperator() throws Exception {

Reply via email to