This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9d58ee4b1f1 [HUDI-5994] Bucket index supports bulk insert row writer
(#8776)
9d58ee4b1f1 is described below
commit 9d58ee4b1f1fce213ee3f4ff478eb003da943924
Author: Danny Chan <[email protected]>
AuthorDate: Wed May 24 20:09:54 2023 +0800
[HUDI-5994] Bucket index supports bulk insert row writer (#8776)
Co-authored-by: y00617041 <[email protected]>
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../apache/hudi/index/bucket/BucketIdentifier.java | 4 +
.../BucketBulkInsertPartitionerWithRows.java | 81 ++++++++++++++
.../BucketBulkInsertDataInternalWriterHelper.java | 124 +++++++++++++++++++++
.../commit/BulkInsertDataInternalWriterHelper.java | 44 ++++----
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 15 ++-
.../apache/spark/sql/hudi/TestInsertTable.scala | 56 ++++++++++
.../HoodieBulkInsertDataInternalWriter.java | 7 +-
8 files changed, 307 insertions(+), 28 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e013da5f3a1..878d185af06 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -1944,6 +1944,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getString(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD);
}
+ public String getBucketIndexHashFieldWithDefault() {
+ return getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD,
getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME));
+ }
+
/**
* storage properties.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
index b8d104e2c57..088e851fd8d 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bucket/BucketIdentifier.java
@@ -91,6 +91,10 @@ public class BucketIdentifier implements Serializable {
return newBucketFileIdPrefix(bucketIdStr(bucketId));
}
+ public static String newBucketFileIdPrefix(String fileId, int bucketId) {
+ return fileId.replaceFirst(".{8}", bucketIdStr(bucketId));
+ }
+
public static String newBucketFileIdPrefix(String bucketId) {
return FSUtils.createNewFileIdPfx().replaceFirst(".{8}", bucketId);
}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
new file mode 100644
index 00000000000..3efd3f61d40
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketBulkInsertPartitionerWithRows.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.table.BulkInsertPartitioner;
+
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import scala.Tuple2;
+
+/**
+ * Bulk_insert partitioner of Spark row using bucket index.
+ */
+public class BucketBulkInsertPartitionerWithRows implements
BulkInsertPartitioner<Dataset<Row>> {
+
+ private final String indexKeyFields;
+ private final int bucketNum;
+
+ public BucketBulkInsertPartitionerWithRows(String indexKeyFields, int
bucketNum) {
+ this.indexKeyFields = indexKeyFields;
+ this.bucketNum = bucketNum;
+ }
+
+ @Override
+ public Dataset<Row> repartitionRecords(Dataset<Row> rows, int
outputPartitions) {
+ Partitioner partitioner = new Partitioner() {
+ @Override
+ public int getPartition(Object key) {
+ return (Integer) key;
+ }
+
+ @Override
+ public int numPartitions() {
+ return outputPartitions;
+ }
+ };
+
+ JavaRDD<Row> rddRows = rows.toJavaRDD()
+ .mapToPair(row -> new Tuple2<>(getPartitionKey(row,
this.indexKeyFields, this.bucketNum, outputPartitions), row))
+ .partitionBy(partitioner)
+ .values();
+ return rows.sparkSession().createDataFrame(rddRows, rows.schema());
+ }
+
+ @Override
+ public boolean arePartitionRecordsSorted() {
+ return false;
+ }
+
+ private static int getPartitionKey(Row row, String indexKeyFields, int
bucketNum, int partitionNum) {
+ int bucketId =
BucketIdentifier.getBucketId(row.getString(HoodieRecord.RECORD_KEY_META_FIELD_ORD),
indexKeyFields, bucketNum);
+ String partition =
row.getString(HoodieRecord.PARTITION_PATH_META_FIELD_ORD);
+ if (partition == null || partition.trim().isEmpty()) {
+ return bucketId;
+ } else {
+ int pw = (partition.hashCode() & Integer.MAX_VALUE) % partitionNum;
+ return BucketIdentifier.mod(bucketId + pw, partitionNum);
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
new file mode 100644
index 00000000000..5c0816eb255
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BucketBulkInsertDataInternalWriterHelper.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.bucket.BucketIdentifier;
+import org.apache.hudi.io.storage.row.HoodieRowCreateHandle;
+import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Helper class for native row writer for bulk_insert with bucket index.
+ */
+public class BucketBulkInsertDataInternalWriterHelper extends
BulkInsertDataInternalWriterHelper {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(BucketBulkInsertDataInternalWriterHelper.class);
+
+ private int lastKnownBucketNum = -1;
+ // p -> (bucketNum -> handle)
+ private final Map<String, Map<Integer, HoodieRowCreateHandle>> bucketHandles;
+ private final String indexKeyFields;
+ private final int bucketNum;
+
+ public BucketBulkInsertDataInternalWriterHelper(HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
+ String instantTime, int
taskPartitionId, long taskId, long taskEpochId, StructType structType,
+ boolean populateMetaFields,
boolean arePartitionRecordsSorted) {
+ super(hoodieTable, writeConfig, instantTime, taskPartitionId, taskId,
taskEpochId, structType, populateMetaFields, arePartitionRecordsSorted);
+ this.indexKeyFields =
writeConfig.getStringOrDefault(HoodieIndexConfig.BUCKET_INDEX_HASH_FIELD,
writeConfig.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key()));
+ this.bucketNum =
writeConfig.getInt(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
+ this.bucketHandles = new HashMap<>();
+ }
+
+ public void write(InternalRow row) throws IOException {
+ try {
+ UTF8String partitionPath = extractPartitionPath(row);
+ UTF8String recordKey = extractRecordKey(row);
+ int bucketId = BucketIdentifier.getBucketId(String.valueOf(recordKey),
indexKeyFields, bucketNum);
+ if (lastKnownPartitionPath == null ||
!Objects.equals(lastKnownPartitionPath, partitionPath) || !handle.canWrite() ||
bucketId != lastKnownBucketNum) {
+ handle = getBucketRowCreateHandle(String.valueOf(partitionPath),
bucketId);
+ // NOTE: It's crucial to make a copy here, since [[UTF8String]] could
be pointing into
+ // a mutable underlying buffer
+ lastKnownPartitionPath = partitionPath.clone();
+ lastKnownBucketNum = bucketId;
+ }
+
+ handle.write(row);
+ } catch (Throwable t) {
+ LOG.error("Global error thrown while trying to write records in
HoodieRowCreateHandle ", t);
+ throw t;
+ }
+ }
+
+ private UTF8String extractRecordKey(InternalRow row) {
+ if (populateMetaFields) {
+ // In case meta-fields are materialized w/in the table itself, we can
just simply extract
+ // partition path from there
+ //
+ // NOTE: Helper keeps track of [[lastKnownPartitionPath]] as
[[UTF8String]] to avoid
+ // conversion from Catalyst internal representation into a
[[String]]
+ return row.getUTF8String(HoodieRecord.RECORD_KEY_META_FIELD_ORD);
+ } else if (keyGeneratorOpt.isPresent()) {
+ return keyGeneratorOpt.get().getRecordKey(row, structType);
+ } else {
+ return UTF8String.EMPTY_UTF8;
+ }
+ }
+
+ protected HoodieRowCreateHandle getBucketRowCreateHandle(String
partitionPath, int bucketId) {
+ Map<Integer, HoodieRowCreateHandle> bucketHandleMap =
bucketHandles.computeIfAbsent(partitionPath, p -> new HashMap<>());
+ if (!bucketHandleMap.isEmpty() && bucketHandleMap.containsKey(bucketId)) {
+ return bucketHandleMap.get(bucketId);
+ }
+ LOG.info("Creating new file for partition path {} and bucket {}",
partitionPath, bucketId);
+ HoodieRowCreateHandle rowCreateHandle = new
HoodieRowCreateHandle(hoodieTable, writeConfig, partitionPath,
getNextBucketFileId(bucketId),
+ instantTime, taskPartitionId, taskId, taskEpochId, structType,
shouldPreserveHoodieMetadata);
+ bucketHandleMap.put(bucketId, rowCreateHandle);
+ return rowCreateHandle;
+ }
+
+ @Override
+ public void close() throws IOException {
+ for (Map<Integer, HoodieRowCreateHandle> entry : bucketHandles.values()) {
+ for (HoodieRowCreateHandle rowCreateHandle : entry.values()) {
+ LOG.info("Closing bulk insert file " + rowCreateHandle.getFileName());
+ writeStatusList.add(rowCreateHandle.close());
+ }
+ entry.clear();
+ }
+ bucketHandles.clear();
+ handle = null;
+ }
+
+ protected String getNextBucketFileId(int bucketInt) {
+ return BucketIdentifier.newBucketFileIdPrefix(getNextFileId(), bucketInt);
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
index 3d0683e23f8..6eddc489088 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BulkInsertDataInternalWriterHelper.java
@@ -54,30 +54,30 @@ public class BulkInsertDataInternalWriterHelper {
private static final Logger LOG =
LoggerFactory.getLogger(BulkInsertDataInternalWriterHelper.class);
- private final String instantTime;
- private final int taskPartitionId;
- private final long taskId;
- private final long taskEpochId;
- private final HoodieTable hoodieTable;
- private final HoodieWriteConfig writeConfig;
- private final StructType structType;
- private final Boolean arePartitionRecordsSorted;
- private final List<HoodieInternalWriteStatus> writeStatusList = new
ArrayList<>();
- private final String fileIdPrefix;
- private final Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
- private final boolean populateMetaFields;
- private final boolean shouldPreserveHoodieMetadata;
- private final Option<BuiltinKeyGenerator> keyGeneratorOpt;
- private final boolean simpleKeyGen;
- private final int simplePartitionFieldIndex;
- private final DataType simplePartitionFieldDataType;
+ protected final String instantTime;
+ protected final int taskPartitionId;
+ protected final long taskId;
+ protected final long taskEpochId;
+ protected final HoodieTable hoodieTable;
+ protected final HoodieWriteConfig writeConfig;
+ protected final StructType structType;
+ protected final Boolean arePartitionRecordsSorted;
+ protected final List<HoodieInternalWriteStatus> writeStatusList = new
ArrayList<>();
+ protected final String fileIdPrefix;
+ protected final Map<String, HoodieRowCreateHandle> handles = new HashMap<>();
+ protected final boolean populateMetaFields;
+ protected final boolean shouldPreserveHoodieMetadata;
+ protected final Option<BuiltinKeyGenerator> keyGeneratorOpt;
+ protected final boolean simpleKeyGen;
+ protected final int simplePartitionFieldIndex;
+ protected final DataType simplePartitionFieldDataType;
/**
* NOTE: This is stored as Catalyst's internal {@link UTF8String} to avoid
* conversion (deserialization) b/w {@link UTF8String} and {@link
String}
*/
- private UTF8String lastKnownPartitionPath = null;
- private HoodieRowCreateHandle handle;
- private int numFilesWritten = 0;
+ protected UTF8String lastKnownPartitionPath = null;
+ protected HoodieRowCreateHandle handle;
+ protected int numFilesWritten = 0;
public BulkInsertDataInternalWriterHelper(HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
String instantTime, int
taskPartitionId, long taskId, long taskEpochId, StructType structType,
@@ -174,7 +174,7 @@ public class BulkInsertDataInternalWriterHelper {
handle = null;
}
- private UTF8String extractPartitionPath(InternalRow row) {
+ protected UTF8String extractPartitionPath(InternalRow row) {
if (populateMetaFields) {
// In case meta-fields are materialized w/in the table itself, we can
just simply extract
// partition path from there
@@ -215,7 +215,7 @@ public class BulkInsertDataInternalWriterHelper {
instantTime, taskPartitionId, taskId, taskEpochId, structType,
shouldPreserveHoodieMetadata);
}
- private String getNextFileId() {
+ protected String getNextFileId() {
return String.format("%s-%d", fileIdPrefix, numFilesWritten++);
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index ec705e726af..2a2e0d59e7d 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -44,8 +44,9 @@ import org.apache.hudi.common.util.{CommitUtils, StringUtils,
Option => HOption}
import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH,
INDEX_CLASS_NAME}
import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig}
import org.apache.hudi.exception.{HoodieException,
SchemaCompatibilityException}
-import
org.apache.hudi.execution.bulkinsert.{BulkInsertInternalPartitionerWithRowsFactory,
NonSortPartitionerWithRows}
+import
org.apache.hudi.execution.bulkinsert.{BucketBulkInsertPartitionerWithRows,
BulkInsertInternalPartitionerWithRowsFactory, NonSortPartitionerWithRows}
import org.apache.hudi.hive.{HiveSyncConfigHolder, HiveSyncTool}
+import org.apache.hudi.index.HoodieIndex.IndexType
import org.apache.hudi.internal.DataSourceInternalWriterHelper
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
@@ -806,11 +807,15 @@ object HoodieSparkSqlWriter {
val populateMetaFields =
hoodieConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS)
val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if
(populateMetaFields) {
- val userDefinedBulkInsertPartitionerOpt =
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
- if (userDefinedBulkInsertPartitionerOpt.isPresent) {
- userDefinedBulkInsertPartitionerOpt.get
+ if (writeConfig.getIndexType == IndexType.BUCKET) {
+ new
BucketBulkInsertPartitionerWithRows(writeConfig.getBucketIndexHashFieldWithDefault,
writeConfig.getBucketIndexNumBuckets)
} else {
- BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig,
isTablePartitioned)
+ val userDefinedBulkInsertPartitionerOpt =
DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)
+ if (userDefinedBulkInsertPartitionerOpt.isPresent) {
+ userDefinedBulkInsertPartitionerOpt.get
+ } else {
+ BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig,
isTablePartitioned)
+ }
}
} else {
// Sort modes are not yet supported when meta fields are disabled
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
index d40bae13209..e743d6ed952 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala
@@ -1059,6 +1059,62 @@ class TestInsertTable extends HoodieSparkSqlTestBase {
}
}
+ test("Test Bulk Insert Into Bucket Index Table") {
+ withSQLConf("hoodie.datasource.write.operation" -> "bulk_insert") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ // Create a partitioned table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | dt string,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | primaryKey = 'id,name',
+ | preCombineField = 'ts',
+ | hoodie.index.type = 'BUCKET',
+ | hoodie.bucket.index.hash.field = 'id,name')
+ | partitioned by (dt)
+ | location '${tmp.getCanonicalPath}'
+ """.stripMargin)
+
+ // Note: Do not write the field alias, the partition field must be
placed last.
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1,1', 10, 1000, "2021-01-05"),
+ | (2, 'a2', 20, 2000, "2021-01-06"),
+ | (3, 'a3,3', 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3,3", 30.0, 3000, "2021-01-07")
+ )
+
+ spark.sql(
+ s"""
+ | insert into $tableName values
+ | (1, 'a1', 10, 1000, "2021-01-05"),
+ | (3, "a3", 30, 3000, "2021-01-07")
+ """.stripMargin)
+
+ checkAnswer(s"select id, name, price, ts, dt from $tableName")(
+ Seq(1, "a1,1", 10.0, 1000, "2021-01-05"),
+ Seq(1, "a1", 10.0, 1000, "2021-01-05"),
+ Seq(2, "a2", 20.0, 2000, "2021-01-06"),
+ Seq(3, "a3,3", 30.0, 3000, "2021-01-07"),
+ Seq(3, "a3", 30.0, 3000, "2021-01-07")
+ )
+ }
+ }
+ }
+
/**
* This test is to make sure that bulk insert doesn't create a bunch of tiny
files if
* hoodie.bulkinsert.user.defined.partitioner.sort.columns doesn't start
with the partition columns
diff --git
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
index 24157c694ef..69601a20454 100644
---
a/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark3-common/src/main/java/org/apache/hudi/spark3/internal/HoodieBulkInsertDataInternalWriter.java
@@ -21,6 +21,8 @@ package org.apache.hudi.spark3.internal;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.commit.BulkInsertDataInternalWriterHelper;
+import
org.apache.hudi.table.action.commit.BucketBulkInsertDataInternalWriterHelper;
+import org.apache.hudi.index.HoodieIndex;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
@@ -39,7 +41,10 @@ public class HoodieBulkInsertDataInternalWriter implements
DataWriter<InternalRo
public HoodieBulkInsertDataInternalWriter(HoodieTable hoodieTable,
HoodieWriteConfig writeConfig,
String instantTime, int
taskPartitionId, long taskId, StructType structType, boolean populateMetaFields,
boolean arePartitionRecordsSorted)
{
- this.bulkInsertWriterHelper = new
BulkInsertDataInternalWriterHelper(hoodieTable,
+ this.bulkInsertWriterHelper = writeConfig.getIndexType() ==
HoodieIndex.IndexType.BUCKET
+ ? new BucketBulkInsertDataInternalWriterHelper(hoodieTable,
+ writeConfig, instantTime, taskPartitionId, taskId, 0, structType,
populateMetaFields, arePartitionRecordsSorted)
+ : new BulkInsertDataInternalWriterHelper(hoodieTable,
writeConfig, instantTime, taskPartitionId, taskId, 0, structType,
populateMetaFields, arePartitionRecordsSorted);
}