This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d58ee4b1f1 [HUDI-5994] Bucket index supports bulk insert row writer 
(#8776)
9d58ee4b1f1 is described below

commit 9d58ee4b1f1fce213ee3f4ff478eb003da943924
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 24 20:09:54 2023 +0800

    [HUDI-5994] Bucket index supports bulk insert row writer (#8776)
    
    Co-authored-by: y00617041 <[email protected]>
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../apache/hudi/index/bucket/BucketIdentifier.java |   4 +
 .../BucketBulkInsertPartitionerWithRows.java       |  81 ++++++++++++++
 .../BucketBulkInsertDataInternalWriterHelper.java  | 124 +++++++++++++++++++++
 .../commit/BulkInsertDataInternalWriterHelper.java |  44 ++++----
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  15 ++-
 .../apache/spark/sql/hudi/TestInsertTable.scala    |  56 ++++++++++
 .../HoodieBulkInsertDataInternalWriter.java        |   7 +-
 8 files changed, 307 insertions(+), 28 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e013da5f3a1..878d185af06 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1944,6 +1944,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD);
   }
 
+  public String getBucketIndexHashFieldWithDefault() {
+    return getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, 
getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
+  }
+
   /**
    * storage properties.
    */
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index b8d104e2c57..088e851fd8d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -91,6 +91,10 @@ public class BucketIdentifier implements Serializable {
     return newBucketFileIdPrefix(bucketIdStr(bucketId));
   }
 
+  public static String newBucketFileIdPrefix(String fileId, int bucketId) {
+    return fileId.replaceFirst(".{8}", bucketIdStr(bucketId));
+  }
+
   public static String newBucketFileIdPrefix(String bucketId) {
     return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
new file mode 100644
index 00000000000..3efd3f61d40
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import scala.Tuple2;
+
+/**
+ * Bulk_insert partitioner of Spark row using bucket index.
+ */
+public class BucketBulkInsertPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
+
+  private final String indexKeyFields;
+  private final int bucketNum;
+
+  public BucketBulkInsertPartitionerWithRows(String indexKeyFields, int 
bucketNum) {
+    this.indexKeyFields = indexKeyFields;
+    this.bucketNum = bucketNum;
+  }
+
+  @Override
+  public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputPartitions) {
+    Partitioner partitioner = new Partitioner() {
+      @Override
+      public int getPartition(Object key) {
+        return (Integer) key;
+      }
+
+      @Override
+      public int numPartitions() {
+        return outputPartitions;
+      }
+    };
+
+    JavaRDD<Row> rddRows = rows.toJavaRDD()
+        .mapToPair(row -> new Tuple2<>(getPartitionKey(row, 
this.indexKeyFields, this.bucketNum, outputPartitions), row))
+        .partitionBy(partitioner)
+        .values();
+    return rows.sparkSession().createDataFrame(rddRows, rows.schema());
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+    return false;
+  }
+
+  private static int getPartitionKey(Row row, String indexKeyFields, int 
bucketNum, int partitionNum) {
+    int bucketId = 
BucketIdentifier.getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD),
 indexKeyFields, bucketNum);
+    String partition = 
row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
+    if (partition == null || partition.trim().isEmpty()) {
+      return bucketId;
+    } else {
+      int pw = (partition.hashCode() & Integer.MAX_VALUE) % partitionNum;
+      return BucketIdentifier.mod(bucketId + pw, partitionNum);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
new file mode 100644
index 00000000000..5c0816eb255
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Helper class for native row writer for bulk_insert with bucket index.
+ */
+public class BucketBulkInsertDataInternalWriterHelper extends 
BulkInsertDataInternalWriterHelper {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BucketBulkInsertDataInternalWriterHelper.class);
+
+  private int lastKnownBucketNum = -1;
+  // p -> (bucketNum -> handle)
+  private final Map<String, Map<Integer, HoodieRowCreateHandle>> bucketHandles;
+  private final String indexKeyFields;
+  private final int bucketNum;
+
+  public BucketBulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
+                                                  String instantTime, int 
taskPartitionId, long taskId, long taskEpochId, StructType structType,
+                                                  boolean populateMetaFields, 
boolean arePartitionRecordsSorted) {
+    super(hoodieTable, writeConfig, instantTime, taskPartitionId, taskId, 
taskEpochId, structType, populateMetaFields, arePartitionRecordsSorted);
+    this.indexKeyFields = 
writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD, 
writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));
+    this.bucketNum = 
writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
+    this.bucketHandles = new HashMap<>();
+  }
+
+  public void write(InternalRow row) throws IOException {
+    try {
+      UTF8String partitionPath = extractPartitionPath(row);
+      UTF8String recordKey = extractRecordKey(row);
+      int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey), 
indexKeyFields, bucketNum);
+      if (lastKnownPartitionPath == null || 
!Objects.equals(lastKnownPartitionPath, partitionPath) || !handle.canWrite() || 
bucketId != lastKnownBucketNum) {
+        handle = getBucketRowCreateHandle(String.valueOf(partitionPath), 
bucketId);
+        // NOTE: It's crucial to make a copy here, since [[UTF8String]] could 
be pointing into
+        //       a mutable underlying buffer
+        lastKnownPartitionPath = partitionPath.clone();
+        lastKnownBucketNum = bucketId;
+      }
+
+      handle.write(row);
+    } catch (Throwable t) {
+      LOG.error("Global error thrown while trying to write records in 
HoodieRowCreateHandle ", t);
+      throw t;
+    }
+  }
+
+  private UTF8String extractRecordKey(InternalRow row) {
+    if (populateMetaFields) {
+      // In case meta-fields are materialized w/in the table itself, we can 
just simply extract
+      // partition path from there
+      //
+      // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as 
[[UTF8String]] to avoid
+      //       conversion from Catalyst internal representation into a 
[[String]]
+      return row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
+    } else if (keyGeneratorOpt.isPresent()) {
+      return keyGeneratorOpt.get().getRecordKey(row, structType);
+    } else {
+      return UTF8String.EMPTY_UTF8;
+    }
+  }
+
+  protected HoodieRowCreateHandle getBucketRowCreateHandle(String 
partitionPath, int bucketId) {
+    Map<Integer, HoodieRowCreateHandle> bucketHandleMap = 
bucketHandles.computeIfAbsent(partitionPath, p -> new HashMap<>());
+    if (!bucketHandleMap.isEmpty() && bucketHandleMap.containsKey(bucketId)) {
+      return bucketHandleMap.get(bucketId);
+    }
+    LOG.info("Creating new file for partition path {} and bucket {}", 
partitionPath, bucketId);
+    HoodieRowCreateHandle rowCreateHandle = new 
HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath, 
getNextBucketFileId(bucketId),
+        instantTime, taskPartitionId, taskId, taskEpochId, structType, 
shouldPreserveHoodieMetadata);
+    bucketHandleMap.put(bucketId, rowCreateHandle);
+    return rowCreateHandle;
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (Map<Integer, HoodieRowCreateHandle> entry : bucketHandles.values()) {
+      for (HoodieRowCreateHandle rowCreateHandle : entry.values()) {
+        LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
+        writeStatusList.add(rowCreateHandle.close());
+      }
+      entry.clear();
+    }
+    bucketHandles.clear();
+    handle = null;
+  }
+
+  protected String getNextBucketFileId(int bucketInt) {
+    return BucketIdentifier.newBucketFileIdPrefix(getNextFileId(), bucketInt);
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
index 3d0683e23f8..6eddc489088 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
@@ -54,30 +54,30 @@ public class BulkInsertDataInternalWriterHelper {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BulkInsertDataInternalWriterHelper.class);
 
-  private final String instantTime;
-  private final int taskPartitionId;
-  private final long taskId;
-  private final long taskEpochId;
-  private final HoodieTable hoodieTable;
-  private final HoodieWriteConfig writeConfig;
-  private final StructType structType;
-  private final Boolean arePartitionRecordsSorted;
-  private final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
-  private final String fileIdPrefix;
-  private final Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
-  private final boolean populateMetaFields;
-  private final boolean shouldPreserveHoodieMetadata;
-  private final Option<BuiltinKeyGenerator> keyGeneratorOpt;
-  private final boolean simpleKeyGen;
-  private final int simplePartitionFieldIndex;
-  private final DataType simplePartitionFieldDataType;
+  protected final String instantTime;
+  protected final int taskPartitionId;
+  protected final long taskId;
+  protected final long taskEpochId;
+  protected final HoodieTable hoodieTable;
+  protected final HoodieWriteConfig writeConfig;
+  protected final StructType structType;
+  protected final Boolean arePartitionRecordsSorted;
+  protected final List<HoodieInternalWriteStatus> writeStatusList = new 
ArrayList<>();
+  protected final String fileIdPrefix;
+  protected final Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
+  protected final boolean populateMetaFields;
+  protected final boolean shouldPreserveHoodieMetadata;
+  protected final Option<BuiltinKeyGenerator> keyGeneratorOpt;
+  protected final boolean simpleKeyGen;
+  protected final int simplePartitionFieldIndex;
+  protected final DataType simplePartitionFieldDataType;
   /**
    * NOTE: This is stored as Catalyst's internal {@link UTF8String} to avoid
    *       conversion (deserialization) b/w {@link UTF8String} and {@link 
String}
    */
-  private UTF8String lastKnownPartitionPath = null;
-  private HoodieRowCreateHandle handle;
-  private int numFilesWritten = 0;
+  protected UTF8String lastKnownPartitionPath = null;
+  protected HoodieRowCreateHandle handle;
+  protected int numFilesWritten = 0;
 
   public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
                                             String instantTime, int 
taskPartitionId, long taskId, long taskEpochId, StructType structType,
@@ -174,7 +174,7 @@ public class BulkInsertDataInternalWriterHelper {
     handle = null;
   }
 
-  private UTF8String extractPartitionPath(InternalRow row) {
+  protected UTF8String extractPartitionPath(InternalRow row) {
     if (populateMetaFields) {
       // In case meta-fields are materialized w/in the table itself, we can 
just simply extract
       // partition path from there
@@ -215,7 +215,7 @@ public class BulkInsertDataInternalWriterHelper {
         instantTime, taskPartitionId, taskId, taskEpochId, structType, 
shouldPreserveHoodieMetadata);
   }
 
-  private String getNextFileId() {
+  protected String getNextFileId() {
     return String.format("%s-%d", fileIdPrefix, numFilesWritten++);
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index ec705e726af..2a2e0d59e7d 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -44,8 +44,9 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils, 
Option => HOption}
 import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, 
INDEX_CLASS_NAME}
 import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.{HoodieException, 
SchemaCompatibilityException}
-import 
org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory,
 NonSortPartitionerWithRows}
+import 
org.apache.hudi.execution.bulkinsert.{BucketBulkInsertPartitionerWithRows, 
BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
 import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.index.HoodieIndex.IndexType
 import org.apache.hudi.internal.DataSourceInternalWriterHelper
 import org.apache.hudi.internal.schema.InternalSchema
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
@@ -806,11 +807,15 @@ object HoodieSparkSqlWriter {
     val populateMetaFields = 
hoodieConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS)
 
     val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if 
(populateMetaFields) {
-      val userDefinedBulkInsertPartitionerOpt = 
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
-      if (userDefinedBulkInsertPartitionerOpt.isPresent) {
-        userDefinedBulkInsertPartitionerOpt.get
+      if (writeConfig.getIndexType == IndexType.BUCKET) {
+        new 
BucketBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault,
 writeConfig.getBucketIndexNumBuckets)
       } else {
-        BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, 
isTablePartitioned)
+        val userDefinedBulkInsertPartitionerOpt = 
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
+        if (userDefinedBulkInsertPartitionerOpt.isPresent) {
+          userDefinedBulkInsertPartitionerOpt.get
+        } else {
+          BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, 
isTablePartitioned)
+        }
       }
     } else {
       // Sort modes are not yet supported when meta fields are disabled
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index d40bae13209..e743d6ed952 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1059,6 +1059,62 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
     }
   }
 
+  test("Test Bulk Insert Into Bucket Index Table") {
+    withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        // Create a partitioned table
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  dt string,
+             |  name string,
+             |  price double,
+             |  ts long
+             |) using hudi
+             | tblproperties (
+             | primaryKey = 'id,name',
+             | preCombineField = 'ts',
+             | hoodie.index.type = 'BUCKET',
+             | hoodie.bucket.index.hash.field = 'id,name')
+             | partitioned by (dt)
+             | location '${tmp.getCanonicalPath}'
+       """.stripMargin)
+
+        // Note: Do not write the field alias, the partition field must be 
placed last.
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1,1', 10, 1000, "2021-01-05"),
+             | (2, 'a2', 20, 2000, "2021-01-06"),
+             | (3, 'a3,3', 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+        )
+
+        spark.sql(
+          s"""
+             | insert into $tableName values
+             | (1, 'a1', 10, 1000, "2021-01-05"),
+             | (3, "a3", 30, 3000, "2021-01-07")
+              """.stripMargin)
+
+        checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+          Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+          Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+          Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+          Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+          Seq(3, "a3", 30.0, 3000, "2021-01-07")
+        )
+      }
+    }
+  }
+
   /**
    * This test is to make sure that bulk insert doesn't create a bunch of tiny 
files if
    * hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start 
with the partition columns
diff --git 
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
index 24157c694ef..69601a20454 100644
--- 
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
@@ -21,6 +21,8 @@ package org.apache.hudi.spark3.internal;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper;
+import 
org.apache.hudi.table.action.commit.BucketBulkInsertDataInternalWriterHelper;
+import org.apache.hudi.index.HoodieIndex;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.write.DataWriter;
@@ -39,7 +41,10 @@ public class HoodieBulkInsertDataInternalWriter implements 
DataWriter<InternalRo
   public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable, 
HoodieWriteConfig writeConfig,
                                             String instantTime, int 
taskPartitionId, long taskId, StructType structType, boolean populateMetaFields,
                                             boolean arePartitionRecordsSorted) 
{
-    this.bulkInsertWriterHelper = new 
BulkInsertDataInternalWriterHelper(hoodieTable,
+    this.bulkInsertWriterHelper = writeConfig.getIndexType() == 
HoodieIndex.IndexType.BUCKET
+      ? new BucketBulkInsertDataInternalWriterHelper(hoodieTable,
+        writeConfig, instantTime, taskPartitionId, taskId, 0, structType, 
populateMetaFields, arePartitionRecordsSorted)
+      : new BulkInsertDataInternalWriterHelper(hoodieTable,
         writeConfig, instantTime, taskPartitionId, taskId, 0, structType, 
populateMetaFields, arePartitionRecordsSorted);
   }
 

Reply via email to