This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1533bac830b [HUDI-9248] Unify code paths for all write operations 
about bulk_insert (#13360)
1533bac830b is described below

commit 1533bac830b6396688cb4db34ff938f6bcebea21
Author: TheR1sing3un <[email protected]>
AuthorDate: Wed Jun 4 20:44:45 2025 +0800

    [HUDI-9248] Unify code paths for all write operations about bulk_insert 
(#13360)
    
    * refactor: Unify all the code paths of bulk insert operations
    
    ---------
    
    Signed-off-by: TheR1sing3un <[email protected]>
---
 .../BaseDatasetBulkInsertCommitActionExecutor.java | 12 +++--
 .../DatasetBulkInsertCommitActionExecutor.java     | 57 ++--------------------
 ...eamerDatasetBulkInsertCommitActionExecutor.java | 20 +++-----
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 26 ++++------
 .../TestSparkDataSourceDAGExecution.scala          |  2 +-
 5 files changed, 31 insertions(+), 86 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index acdc139cf7c..3b68659abf7 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.CommitUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -72,7 +73,12 @@ public abstract class 
BaseDatasetBulkInsertCommitActionExecutor implements Seria
     writeClient.preWrite(instantTime, getWriteOperationType(), 
table.getMetaClient());
   }
 
-  protected abstract Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> 
records, boolean arePartitionRecordsSorted);
+  protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records, 
boolean arePartitionRecordsSorted) {
+    
table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED,
+        getCommitActionType(), instantTime), Option.empty());
+    return Option.of(HoodieDatasetBulkInsertHelper
+        .bulkInsert(records, instantTime, table, writeConfig, 
arePartitionRecordsSorted, false, getWriteOperationType()));
+  }
 
   protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
result) {
     writeClient.postWrite(result, instantTime, table);
@@ -132,9 +138,7 @@ public abstract class 
BaseDatasetBulkInsertCommitActionExecutor implements Seria
     }
   }
 
-  protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
-    return Collections.emptyMap();
-  }
+  protected abstract Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses);
 
   public String getInstantTime() {
     return instantTime;
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
index 62d89b2c034..8b124de565c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
@@ -18,27 +18,15 @@
 
 package org.apache.hudi.commit;
 
-import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieInternalConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.internal.DataSourceInternalWriterHelper;
-import org.apache.hudi.table.action.HoodieWriteMetadata;
-
-import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SaveMode;
 
 import java.util.Collections;
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 public class DatasetBulkInsertCommitActionExecutor extends 
BaseDatasetBulkInsertCommitActionExecutor {
 
@@ -47,47 +35,12 @@ public class DatasetBulkInsertCommitActionExecutor extends 
BaseDatasetBulkInsert
     super(config, writeClient);
   }
 
-  @Override
-  protected void preExecute() {
-    instantTime = writeClient.startCommit();
-    table = writeClient.initTable(getWriteOperationType(), 
Option.ofNullable(instantTime));
-  }
-
-  @Override
-  protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records, 
boolean arePartitionRecordsSorted) {
-    Map<String, String> opts = 
writeConfig.getProps().entrySet().stream().collect(Collectors.toMap(
-        e -> String.valueOf(e.getKey()),
-        e -> String.valueOf(e.getValue())));
-    Map<String, String> optsOverrides = Collections.singletonMap(
-        HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED, 
String.valueOf(arePartitionRecordsSorted));
-
-    String targetFormat;
-    Map<String, String> customOpts = new HashMap<>(1);
-    if (HoodieSparkUtils.isSpark3()) {
-      targetFormat = "org.apache.hudi.spark.internal";
-      
customOpts.put(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key(), 
records.schema().json());
-    } else {
-      throw new HoodieException("Bulk insert using row writer is not supported 
with current Spark version."
-          + " To use row writer please switch to spark 3");
-    }
-
-    records.write().format(targetFormat)
-        .option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, 
instantTime)
-        .options(opts)
-        .options(customOpts)
-        .options(optsOverrides)
-        .mode(SaveMode.Append)
-        .save();
-    return Option.empty();
-  }
-
-  @Override
-  protected void afterExecute(HoodieWriteMetadata<JavaRDD<WriteStatus>> 
result) {
-    // no op
-  }
-
   @Override
   public WriteOperationType getWriteOperationType() {
     return WriteOperationType.BULK_INSERT;
   }
+
+  protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
+    return Collections.emptyMap();
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
index f5f5d47e6cc..8e183629f48 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
@@ -18,17 +18,15 @@
 
 package org.apache.hudi.commit;
 
-import org.apache.hudi.HoodieDatasetBulkInsertHelper;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Executor to be used by stream sync. Directly invokes 
HoodieDatasetBulkInsertHelper.bulkInsert so that WriteStatus is
@@ -40,16 +38,12 @@ public class 
HoodieStreamerDatasetBulkInsertCommitActionExecutor extends BaseDat
     super(config, writeClient);
   }
 
-  @Override
-  protected Option<HoodieData<WriteStatus>> doExecute(Dataset<Row> records, 
boolean arePartitionRecordsSorted) {
-    
table.getActiveTimeline().transitionRequestedToInflight(table.getMetaClient().createNewInstant(HoodieInstant.State.REQUESTED,
-            getCommitActionType(), instantTime), Option.empty());
-    return Option.of(HoodieDatasetBulkInsertHelper
-        .bulkInsert(records, instantTime, table, writeConfig, 
arePartitionRecordsSorted, false, getWriteOperationType()));
-  }
-
   @Override
   public WriteOperationType getWriteOperationType() {
     return WriteOperationType.BULK_INSERT;
   }
+
+  protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
+    return Collections.emptyMap();
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 6c0ad0c8ac0..e6588053425 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -841,17 +841,9 @@ class HoodieSparkSqlWriterInternal {
     val instantTime = executor.getInstantTime
 
     try {
-      val (writeSuccessful, compactionInstant, clusteringInstant) = mode match 
{
-        case _ if overwriteOperationType == null =>
-          val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, 
basePath, df.schema)
-          (syncHiveSuccess, HOption.empty().asInstanceOf[HOption[String]], 
HOption.empty().asInstanceOf[HOption[String]])
-        case _ =>
-          try {
-            commitAndPerformPostOperations(sqlContext.sparkSession, df.schema, 
writeResult, parameters, writeClient, tableConfig, jsc,
-              TableInstantInfo(basePath, instantTime, 
executor.getCommitActionType, executor.getWriteOperationType), Option.empty)
-
-          }
-      }
+      val (writeSuccessful, compactionInstant, clusteringInstant) = 
commitAndPerformPostOperations(
+        sqlContext.sparkSession, df.schema, writeResult, parameters, 
writeClient, tableConfig, jsc,
+        TableInstantInfo(basePath, instantTime, executor.getCommitActionType, 
executor.getWriteOperationType), Option.empty)
       (writeSuccessful, HOption.ofNullable(instantTime), compactionInstant, 
clusteringInstant, writeClient, tableConfig)
     } finally {
       // close the write client in all cases
@@ -993,11 +985,13 @@ class HoodieSparkSqlWriterInternal {
                                             ): (Boolean, 
HOption[java.lang.String], HOption[java.lang.String]) = {
     if (writeResult.getWriteStatuses.rdd.filter(ws => ws.hasErrors).count() == 
0) {
       log.info("Proceeding to commit the write.")
-      val metaMap = parameters.filter(kv =>
-        kv._1.startsWith(parameters(COMMIT_METADATA_KEYPREFIX.key)))
+      // get extra metadata from props
+      // 1. properties starting with commit metadata key prefix
+      // 2. properties related to checkpoint in spark streaming
+      val extraMetadataOpt = 
common.util.Option.of(DataSourceUtils.getExtraMetadata(parameters.asJava))
       val commitSuccess =
         client.commit(tableInstantInfo.instantTime, 
writeResult.getWriteStatuses,
-          common.util.Option.of(new java.util.HashMap[String, 
String](metaMap.asJava)),
+          extraMetadataOpt,
           tableInstantInfo.commitActionType,
           writeResult.getPartitionToReplaceFileIds,
           common.util.Option.ofNullable(extraPreCommitFn.orNull))
@@ -1012,7 +1006,7 @@ class HoodieSparkSqlWriterInternal {
       val asyncCompactionEnabled = isAsyncCompactionEnabled(client, 
tableConfig, parameters, jsc.hadoopConfiguration())
       val compactionInstant: common.util.Option[java.lang.String] =
         if (asyncCompactionEnabled) {
-          client.scheduleCompaction(common.util.Option.of(new 
java.util.HashMap[String, String](metaMap.asJava)))
+          client.scheduleCompaction(extraMetadataOpt)
         } else {
           common.util.Option.empty()
         }
@@ -1022,7 +1016,7 @@ class HoodieSparkSqlWriterInternal {
       val asyncClusteringEnabled = isAsyncClusteringEnabled(client, parameters)
       val clusteringInstant: common.util.Option[java.lang.String] =
         if (asyncClusteringEnabled) {
-          client.scheduleClustering(common.util.Option.of(new 
java.util.HashMap[String, String](metaMap.asJava)))
+          client.scheduleClustering(extraMetadataOpt)
         } else {
           common.util.Option.empty()
         }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
index 98b4be1393b..0255b22c77e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkDataSourceDAGExecution.scala
@@ -93,7 +93,7 @@ class TestSparkDataSourceDAGExecution extends 
HoodieSparkClientTestBase with Sca
   @CsvSource(Array(
     "upsert,org.apache.hudi.client.SparkRDDWriteClient.commit",
     "insert,org.apache.hudi.client.SparkRDDWriteClient.commit",
-    
"bulk_insert,org.apache.hudi.HoodieSparkSqlWriterInternal.bulkInsertAsRow"))
+    "bulk_insert,org.apache.hudi.client.SparkRDDWriteClient.commit"))
   def testWriteOperationDoesNotTriggerRepeatedDAG(operation: String, event: 
String): Unit = {
     // register stage event listeners
     val stageListener = new StageListener(event)

Reply via email to