This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1533bac830b [HUDI-9248] Unify code paths for all write operations
about bulk_insert (#13360)
1533bac830b is described below
commit 1533bac830b6396688cb4db34ff938f6bcebea21
Author: TheR1sing3un <[email protected]>
AuthorDate: Wed Jun 4 20:44:45 2025 +0800
[HUDI-9248] Unify code paths for all write operations about bulk_insert
(#13360)
* refactor: Unify all the code paths of bulk insert operations
---------
Signed-off-by: TheR1sing3un <[email protected]>
---
.../BaseDatasetBulkInsertCommitActionExecutor.java | 12 +++--
.../DatasetBulkInsertCommitActionExecutor.java | 57 ++--------------------
...eamerDatasetBulkInsertCommitActionExecutor.java | 20 +++-----
.../org/apache/hudi/HoodieSparkSqlWriter.scala | 26 ++++------
.../TestSparkDataSourceDAGExecution.scala | 2 +-
5 files changed, 31 insertions(+), 86 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index acdc139cf7c..3b68659abf7 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -72,7 +73,12 @@ public abstract class
BaseDatasetBulkInsertCommitActionExecutor implements Seria
writeClient.preWrite(instantTime, getWriteOperationType(),
table.getMetaClient());
}
- protected abstract Option<HoodieData<WriteStatus>> doExecute(Dataset<Row>
records, boolean arePartitionRecordsSorted);
+ protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records,
boolean arePartitionRecordsSorted) {
+
table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED,
+ getCommitActionType(), instantTime), Option.empty());
+ return Option.of(HoodieDatasetBulkInsertHelper
+ .bulkInsert(records, instantTime, table, writeConfig,
arePartitionRecordsSorted, false, getWriteOperationType()));
+ }
protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>>
result) {
writeClient.postWrite(result, instantTime, table);
@@ -132,9 +138,7 @@ public abstract class
BaseDatasetBulkInsertCommitActionExecutor implements Seria
}
}
- protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
- return Collections.emptyMap();
- }
+ protected abstract Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses);
public String getInstantTime() {
return instantTime;
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
index 62d89b2c034..8b124de565c 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
@@ -18,27 +18,15 @@
package org.apache.hudi.commit;
-import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieInternalConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.internal.DataSourceInternalWriterHelper;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
import java.util.Collections;
-import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
public class DatasetBulkInsertCommitActionExecutor extends
BaseDatasetBulkInsertCommitActionExecutor {
@@ -47,47 +35,12 @@ public class DatasetBulkInsertCommitActionExecutor extends
BaseDatasetBulkInsert
super(config, writeClient);
}
- @Override
- protected void preExecute() {
- instantTime = writeClient.startCommit();
- table = writeClient.initTable(getWriteOperationType(),
Option.ofNullable(instantTime));
- }
-
- @Override
- protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records,
boolean arePartitionRecordsSorted) {
- Map<String, String> opts =
writeConfig.getProps().entrySet().stream().collect(Collectors.toMap(
- e -> String.valueOf(e.getKey()),
- e -> String.valueOf(e.getValue())));
- Map<String, String> optsOverrides = Collections.singletonMap(
- HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED,
String.valueOf(arePartitionRecordsSorted));
-
- String targetFormat;
- Map<String, String> customOpts = new HashMap<>(1);
- if (HoodieSparkUtils.isSpark3()) {
- targetFormat = "org.apache.hudi.spark.internal";
-
customOpts.put(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key(),
records.schema().json());
- } else {
- throw new HoodieException("Bulk insert using row writer is not supported
with current Spark version."
- + " To use row writer please switch to spark 3");
- }
-
- records.write().format(targetFormat)
- .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY,
instantTime)
- .options(opts)
- .options(customOpts)
- .options(optsOverrides)
- .mode(SaveMode.Append)
- .save();
- return Option.empty();
- }
-
- @Override
- protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>>
result) {
- // no op
- }
-
@Override
public WriteOperationType getWriteOperationType() {
return WriteOperationType.BULK_INSERT;
}
+
+ protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
+ return Collections.emptyMap();
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
index f5f5d47e6cc..8e183629f48 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
@@ -18,17 +18,15 @@
package org.apache.hudi.commit;
-import org.apache.hudi.HoodieDatasetBulkInsertHelper;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
/**
* Executor to be used by stream sync. Directly invokes
HoodieDatasetBulkInsertHelper.bulkInsert so that WriteStatus is
@@ -40,16 +38,12 @@ public class
HoodieStreamerDatasetBulkInsertCommitActionExecutor extends BaseDat
super(config, writeClient);
}
- @Override
- protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records,
boolean arePartitionRecordsSorted) {
-
table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED,
- getCommitActionType(), instantTime), Option.empty());
- return Option.of(HoodieDatasetBulkInsertHelper
- .bulkInsert(records, instantTime, table, writeConfig,
arePartitionRecordsSorted, false, getWriteOperationType()));
- }
-
@Override
public WriteOperationType getWriteOperationType() {
return WriteOperationType.BULK_INSERT;
}
+
+ protected Map<String, List<String>>
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
+ return Collections.emptyMap();
+ }
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 6c0ad0c8ac0..e6588053425 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -841,17 +841,9 @@ class HoodieSparkSqlWriterInternal {
val instantTime = executor.getInstantTime
try {
- val (writeSuccessful, compactionInstant, clusteringInstant) = mode match
{
- case _ if overwriteOperationType == null =>
- val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig,
basePath, df.schema)
- (syncHiveSuccess, HOption.empty().asInstanceOf[HOption[String]],
HOption.empty().asInstanceOf[HOption[String]])
- case _ =>
- try {
- commitAndPerformPostOperations(sqlContext.sparkSession, df.schema,
writeResult, parameters, writeClient, tableConfig, jsc,
- TableInstantInfo(basePath, instantTime,
executor.getCommitActionType, executor.getWriteOperationType), Option.empty)
-
- }
- }
+ val (writeSuccessful, compactionInstant, clusteringInstant) =
commitAndPerformPostOperations(
+ sqlContext.sparkSession, df.schema, writeResult, parameters,
writeClient, tableConfig, jsc,
+ TableInstantInfo(basePath, instantTime, executor.getCommitActionType,
executor.getWriteOperationType), Option.empty)
(writeSuccessful, HOption.ofNullable(instantTime), compactionInstant,
clusteringInstant, writeClient, tableConfig)
} finally {
// close the write client in all cases
@@ -993,11 +985,13 @@ class HoodieSparkSqlWriterInternal {
): (Boolean,
HOption[java.lang.String], HOption[java.lang.String]) = {
if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() ==
0) {
log.info("Proceeding to commit the write.")
- val metaMap = parameters.filter(kv =>
- kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key)))
+ // get extra metadata from props
+ // 1. properties starting with commit metadata key prefix
+ // 2. properties related to checkpoint in spark streaming
+ val extraMetadataOpt =
common.util.Option.of(DataSourceUtils.getExtraMetadata(parameters.asJava))
val commitSuccess =
client.commit(tableInstantInfo.instantTime,
writeResult.getWriteStatuses,
- common.util.Option.of(new java.util.HashMap[String,
String](metaMap.asJava)),
+ extraMetadataOpt,
tableInstantInfo.commitActionType,
writeResult.getPartitionToReplaceFileIds,
common.util.Option.ofNullable(extraPreCommitFn.orNull))
@@ -1012,7 +1006,7 @@ class HoodieSparkSqlWriterInternal {
val asyncCompactionEnabled = isAsyncCompactionEnabled(client,
tableConfig, parameters, jsc.hadoopConfiguration())
val compactionInstant: common.util.Option[java.lang.String] =
if (asyncCompactionEnabled) {
- client.scheduleCompaction(common.util.Option.of(new
java.util.HashMap[String, String](metaMap.asJava)))
+ client.scheduleCompaction(extraMetadataOpt)
} else {
common.util.Option.empty()
}
@@ -1022,7 +1016,7 @@ class HoodieSparkSqlWriterInternal {
val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters)
val clusteringInstant: common.util.Option[java.lang.String] =
if (asyncClusteringEnabled) {
- client.scheduleClustering(common.util.Option.of(new
java.util.HashMap[String, String](metaMap.asJava)))
+ client.scheduleClustering(extraMetadataOpt)
} else {
common.util.Option.empty()
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
index 98b4be1393b..0255b22c77e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
@@ -93,7 +93,7 @@ class TestSparkDataSourceDAGExecution extends
HoodieSparkClientTestBase with Sca
@CsvSource(Array(
"upsert,org.apache.hudi.client.SparkRDDWriteClient.commit",
"insert,org.apache.hudi.client.SparkRDDWriteClient.commit",
-
"bulk_insert,org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow"))
+ "bulk_insert,org.apache.hudi.client.SparkRDDWriteClient.commit"))
def testWriteOperationDoesNotTriggerRepeatedDAG(operation: String, event:
String): Unit = {
// register stage event listeners
val stageListener = new StageListener(event)