This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ae426bc483f [HUDI-5351] Handle populateMetaFields when repartitioning 
in sort partitioner (#7411)
ae426bc483f is described below

commit ae426bc483ffb310e99738219e6ecc9cb8336c0c
Author: Sagar Sumit <sagarsumi...@gmail.com>
AuthorDate: Tue Dec 13 10:22:10 2022 +0530

    [HUDI-5351] Handle populateMetaFields when repartitioning in sort 
partitioner (#7411)
---
 .../MultipleSparkJobExecutionStrategy.java         |  6 +-
 .../BulkInsertInternalPartitionerFactory.java      | 26 +++----
 ...lkInsertInternalPartitionerWithRowsFactory.java | 19 ++---
 .../bulkinsert/GlobalSortPartitioner.java          | 14 ++++
 .../bulkinsert/GlobalSortPartitionerWithRows.java  | 14 ++++
 ...PartitionPathRepartitionAndSortPartitioner.java | 12 +++-
 ...nPathRepartitionAndSortPartitionerWithRows.java | 12 +++-
 .../PartitionPathRepartitionPartitioner.java       | 12 +++-
 ...artitionPathRepartitionPartitionerWithRows.java | 12 +++-
 .../PartitionSortPartitionerWithRows.java          | 14 ++++
 .../bulkinsert/RDDPartitionSortPartitioner.java    | 14 ++++
 .../TestBulkInsertInternalPartitioner.java         | 83 ++++++++++++++--------
 .../TestBulkInsertInternalPartitionerForRows.java  | 69 ++++++++++++------
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  5 +-
 14 files changed, 227 insertions(+), 85 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 074deaa6212..954daaad1e1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -206,10 +206,8 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
           throw new UnsupportedOperationException(String.format("Layout 
optimization strategy '%s' is not supported", layoutOptStrategy));
       }
     }).orElse(isRowPartitioner
-        ? BulkInsertInternalPartitionerWithRowsFactory.get(
-        getWriteConfig().getBulkInsertSortMode(), 
getHoodieTable().isPartitioned(), true)
-        : BulkInsertInternalPartitionerFactory.get(
-        getWriteConfig().getBulkInsertSortMode(), 
getHoodieTable().isPartitioned(), true));
+        ? BulkInsertInternalPartitionerWithRowsFactory.get(getWriteConfig(), 
getHoodieTable().isPartitioned(), true)
+        : BulkInsertInternalPartitionerFactory.get(getHoodieTable(), 
getWriteConfig(), true));
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
index d961c8f9de3..657c671f523 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
@@ -30,37 +30,37 @@ import org.apache.hudi.table.HoodieTable;
  */
 public abstract class BulkInsertInternalPartitionerFactory {
 
-  public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig 
config) {
+  public static BulkInsertPartitioner get(HoodieTable table,
+                                          HoodieWriteConfig config) {
     return get(table, config, false);
   }
 
-  public static BulkInsertPartitioner get(
-      HoodieTable table, HoodieWriteConfig config, boolean 
enforceNumOutputPartitions) {
+  public static BulkInsertPartitioner get(HoodieTable table,
+                                          HoodieWriteConfig config,
+                                          boolean enforceNumOutputPartitions) {
     if (config.getIndexType().equals(HoodieIndex.IndexType.BUCKET)
         && 
config.getBucketIndexEngineType().equals(HoodieIndex.BucketIndexEngineType.CONSISTENT_HASHING))
 {
       return new RDDConsistentBucketPartitioner(table);
     }
-    return get(config.getBulkInsertSortMode(), table.isPartitioned(), 
enforceNumOutputPartitions);
-  }
-
-  public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean 
isTablePartitioned) {
-    return get(sortMode, isTablePartitioned, false);
+    return get(config, table.isPartitioned(), enforceNumOutputPartitions);
   }
 
-  public static BulkInsertPartitioner get(BulkInsertSortMode sortMode,
+  public static BulkInsertPartitioner get(HoodieWriteConfig config,
                                           boolean isTablePartitioned,
                                           boolean enforceNumOutputPartitions) {
+    BulkInsertSortMode sortMode = config.getBulkInsertSortMode();
+
     switch (sortMode) {
       case NONE:
         return new NonSortPartitioner(enforceNumOutputPartitions);
       case GLOBAL_SORT:
-        return new GlobalSortPartitioner();
+        return new GlobalSortPartitioner(config);
       case PARTITION_SORT:
-        return new RDDPartitionSortPartitioner();
+        return new RDDPartitionSortPartitioner(config);
       case PARTITION_PATH_REPARTITION:
-        return new PartitionPathRepartitionPartitioner(isTablePartitioned);
+        return new PartitionPathRepartitionPartitioner(isTablePartitioned, 
config);
       case PARTITION_PATH_REPARTITION_AND_SORT:
-        return new 
PartitionPathRepartitionAndSortPartitioner(isTablePartitioned);
+        return new 
PartitionPathRepartitionAndSortPartitioner(isTablePartitioned, config);
       default:
         throw new HoodieException("The bulk insert sort mode \"" + 
sortMode.name() + "\" is not supported.");
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
index 218eae0dc94..07995e50d6a 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.sql.Dataset;
@@ -29,24 +30,26 @@ import org.apache.spark.sql.Row;
  */
 public abstract class BulkInsertInternalPartitionerWithRowsFactory {
 
-  public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode 
sortMode,
+  public static BulkInsertPartitioner<Dataset<Row>> get(HoodieWriteConfig 
config,
                                                         boolean 
isTablePartitioned) {
-    return get(sortMode, isTablePartitioned, false);
+    return get(config, isTablePartitioned, false);
   }
 
-  public static BulkInsertPartitioner<Dataset<Row>> get(
-      BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean 
enforceNumOutputPartitions) {
+  public static BulkInsertPartitioner<Dataset<Row>> get(HoodieWriteConfig 
config,
+                                                        boolean 
isTablePartitioned,
+                                                        boolean 
enforceNumOutputPartitions) {
+    BulkInsertSortMode sortMode = config.getBulkInsertSortMode();
     switch (sortMode) {
       case NONE:
         return new NonSortPartitionerWithRows(enforceNumOutputPartitions);
       case GLOBAL_SORT:
-        return new GlobalSortPartitionerWithRows();
+        return new GlobalSortPartitionerWithRows(config);
       case PARTITION_SORT:
-        return new PartitionSortPartitionerWithRows();
+        return new PartitionSortPartitionerWithRows(config);
       case PARTITION_PATH_REPARTITION:
-        return new 
PartitionPathRepartitionPartitionerWithRows(isTablePartitioned);
+        return new 
PartitionPathRepartitionPartitionerWithRows(isTablePartitioned, config);
       case PARTITION_PATH_REPARTITION_AND_SORT:
-        return new 
PartitionPathRepartitionAndSortPartitionerWithRows(isTablePartitioned);
+        return new 
PartitionPathRepartitionAndSortPartitionerWithRows(isTablePartitioned, config);
       default:
         throw new UnsupportedOperationException("The bulk insert sort mode \"" 
+ sortMode.name() + "\" is not supported.");
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java
index a184c009a1b..e10d23743da 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitioner.java
@@ -20,10 +20,14 @@ package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.api.java.JavaRDD;
 
+import static 
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.GLOBAL_SORT;
+
 /**
  * A built-in partitioner that does global sorting for the input records 
across partitions
  * after repartition for bulk insert operation, corresponding to the
@@ -34,9 +38,19 @@ import org.apache.spark.api.java.JavaRDD;
 public class GlobalSortPartitioner<T extends HoodieRecordPayload>
     implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 
+  private final boolean shouldPopulateMetaFields;
+
+  public GlobalSortPartitioner(HoodieWriteConfig config) {
+    this.shouldPopulateMetaFields = config.populateMetaFields();
+  }
+
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
+    if (!shouldPopulateMetaFields) {
+      throw new HoodieException(GLOBAL_SORT.name() + " mode requires 
meta-fields to be enabled");
+    }
+
     // Now, sort the records and line them up nicely for loading.
     return records.sortBy(record -> {
       // Let's use "partitionPath + key" as the sort key. Spark, will ensure
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java
index 24bcc0aff0d..1a8e87a3823 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java
@@ -19,19 +19,33 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.functions;
 
+import static 
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.GLOBAL_SORT;
+
 /**
  * A built-in partitioner that does global sorting for the input Rows across 
partitions after repartition for bulk insert operation, corresponding to the 
{@code BulkInsertSortMode.GLOBAL_SORT} mode.
  */
 public class GlobalSortPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
 
+  private final boolean shouldPopulateMetaFields;
+
+  public GlobalSortPartitionerWithRows(HoodieWriteConfig config) {
+    this.shouldPopulateMetaFields = config.populateMetaFields();
+  }
+
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputSparkPartitions) {
+    if (!shouldPopulateMetaFields) {
+      throw new HoodieException(GLOBAL_SORT.name() + " mode requires 
meta-fields to be enabled");
+    }
+
     // Now, sort the records and line them up nicely for loading.
     // Let's use "partitionPath + key" as the sort key.
     return 
rows.sort(functions.col(HoodieRecord.PARTITION_PATH_METADATA_FIELD), 
functions.col(HoodieRecord.RECORD_KEY_METADATA_FIELD))
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java
index e8e1e2072f5..a47f1f9df43 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitioner.java
@@ -21,12 +21,16 @@ package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.api.java.JavaRDD;
 
 import scala.Tuple2;
 
+import static 
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT;
+
 /**
  * A built-in partitioner that does the following for input records for bulk 
insert operation
  * <p>
@@ -45,14 +49,20 @@ public class PartitionPathRepartitionAndSortPartitioner<T 
extends HoodieRecordPa
     implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 
   private final boolean isTablePartitioned;
+  private final boolean shouldPopulateMetaFields;
 
-  public PartitionPathRepartitionAndSortPartitioner(boolean 
isTablePartitioned) {
+  public PartitionPathRepartitionAndSortPartitioner(boolean 
isTablePartitioned, HoodieWriteConfig config) {
     this.isTablePartitioned = isTablePartitioned;
+    this.shouldPopulateMetaFields = config.populateMetaFields();
   }
 
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
+    if (!shouldPopulateMetaFields) {
+      throw new HoodieException(PARTITION_PATH_REPARTITION_AND_SORT.name() + " 
mode requires meta-fields to be enabled");
+    }
+
     if (isTablePartitioned) {
       PartitionPathRDDPartitioner partitioner = new 
PartitionPathRDDPartitioner(
           (partitionPath) -> (String) partitionPath, outputSparkPartitions);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java
index cf3ff1acfa4..ff505624913 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionAndSortPartitionerWithRows.java
@@ -20,12 +20,16 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
+import static 
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT;
+
 /**
  * A built-in partitioner that does the following for input rows for bulk 
insert operation
  * <p>
@@ -41,13 +45,19 @@ import org.apache.spark.sql.Row;
 public class PartitionPathRepartitionAndSortPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
 
   private final boolean isTablePartitioned;
+  private final boolean shouldPopulateMetaFields;
 
-  public PartitionPathRepartitionAndSortPartitionerWithRows(boolean 
isTablePartitioned) {
+  public PartitionPathRepartitionAndSortPartitionerWithRows(boolean 
isTablePartitioned, HoodieWriteConfig config) {
     this.isTablePartitioned = isTablePartitioned;
+    this.shouldPopulateMetaFields = config.populateMetaFields();
   }
 
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputSparkPartitions) {
+    if (!shouldPopulateMetaFields) {
+      throw new HoodieException(PARTITION_PATH_REPARTITION_AND_SORT.name() + " 
mode requires meta-fields to be enabled");
+    }
+
     if (isTablePartitioned) {
       return rows.repartition(outputSparkPartitions, new 
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD))
           .sortWithinPartitions(new 
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java
index 5931b565757..393e47a61e3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitioner.java
@@ -21,12 +21,16 @@ package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.api.java.JavaRDD;
 
 import scala.Tuple2;
 
+import static 
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION;
+
 /**
  * A built-in partitioner that does the following for input records for bulk 
insert operation
  * <p>
@@ -44,14 +48,20 @@ public class PartitionPathRepartitionPartitioner<T extends 
HoodieRecordPayload>
     implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 
   private final boolean isTablePartitioned;
+  private final boolean shouldPopulateMetaFields;
 
-  public PartitionPathRepartitionPartitioner(boolean isTablePartitioned) {
+  public PartitionPathRepartitionPartitioner(boolean isTablePartitioned, 
HoodieWriteConfig config) {
     this.isTablePartitioned = isTablePartitioned;
+    this.shouldPopulateMetaFields = config.populateMetaFields();
   }
 
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
+    if (!shouldPopulateMetaFields) {
+      throw new HoodieException(PARTITION_PATH_REPARTITION.name() + " mode 
requires meta-fields to be enabled");
+    }
+
     if (isTablePartitioned) {
       PartitionPathRDDPartitioner partitioner = new 
PartitionPathRDDPartitioner(
           (partitionPath) -> (String) partitionPath, outputSparkPartitions);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java
index 62d9edbca84..966648d579c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionPathRepartitionPartitionerWithRows.java
@@ -20,12 +20,16 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.sql.Column;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
+import static 
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_PATH_REPARTITION;
+
 /**
  * A built-in partitioner that does the following for input rows for bulk 
insert operation
  * <p>
@@ -40,13 +44,19 @@ import org.apache.spark.sql.Row;
 public class PartitionPathRepartitionPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
 
   private final boolean isTablePartitioned;
+  private final boolean shouldPopulateMetaFields;
 
-  public PartitionPathRepartitionPartitionerWithRows(boolean 
isTablePartitioned) {
+  public PartitionPathRepartitionPartitionerWithRows(boolean 
isTablePartitioned, HoodieWriteConfig config) {
     this.isTablePartitioned = isTablePartitioned;
+    this.shouldPopulateMetaFields = config.populateMetaFields();
   }
 
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputSparkPartitions) {
+    if (!shouldPopulateMetaFields) {
+      throw new HoodieException(PARTITION_PATH_REPARTITION.name() + " mode 
requires meta-fields to be enabled");
+    }
+
     if (isTablePartitioned) {
       return rows.repartition(outputSparkPartitions, new 
Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD));
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java
index b669c338f86..c5c21f14a8e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/PartitionSortPartitionerWithRows.java
@@ -19,18 +19,32 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
+import static 
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_SORT;
+
 /**
  * A built-in partitioner that does local sorting for each spark partitions 
after coalesce for bulk insert operation, corresponding to the {@code 
BulkInsertSortMode.PARTITION_SORT} mode.
  */
 public class PartitionSortPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
 
+  private final boolean shouldPopulateMetaFields;
+
+  public PartitionSortPartitionerWithRows(HoodieWriteConfig config) {
+    this.shouldPopulateMetaFields = config.populateMetaFields();
+  }
+
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputSparkPartitions) {
+    if (!shouldPopulateMetaFields) {
+      throw new HoodieException(PARTITION_SORT.name() + " mode requires 
meta-fields to be enabled");
+    }
+
     return 
rows.coalesce(outputSparkPartitions).sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD,
 HoodieRecord.RECORD_KEY_METADATA_FIELD);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
index 9526ad58564..b5dc83cc2fa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDPartitionSortPartitioner.java
@@ -20,6 +20,8 @@ package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 
 import org.apache.spark.api.java.JavaRDD;
@@ -30,6 +32,8 @@ import java.util.List;
 
 import scala.Tuple2;
 
+import static 
org.apache.hudi.execution.bulkinsert.BulkInsertSortMode.PARTITION_SORT;
+
 /**
  * A built-in partitioner that does local sorting for each RDD partition
  * after coalesce for bulk insert operation, corresponding to the
@@ -40,9 +44,19 @@ import scala.Tuple2;
 public class RDDPartitionSortPartitioner<T extends HoodieRecordPayload>
     implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 
+  private final boolean shouldPopulateMetaFields;
+
+  public RDDPartitionSortPartitioner(HoodieWriteConfig config) {
+    this.shouldPopulateMetaFields = config.populateMetaFields();
+  }
+
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
+    if (!shouldPopulateMetaFields) {
+      throw new HoodieException(PARTITION_SORT.name() + " mode requires 
meta-fields to be enabled");
+    }
+
     return records.coalesce(outputSparkPartitions)
         .mapToPair(record ->
             new Tuple2<>(
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index 7bc64b54457..11420c180eb 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.testutils.HoodieClientTestBase;
@@ -48,6 +49,7 @@ import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase 
implements Serializable {
   private static final Comparator<HoodieRecord<? extends HoodieRecordPayload>> 
KEY_COMPARATOR =
@@ -83,16 +85,21 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
     //   boolean isTablePartitioned,
     //   boolean enforceNumOutputPartitions,
     //   boolean isGloballySorted,
-    //   boolean isLocallySorted
+    //   boolean isLocallySorted,
+    //   boolean populateMetaFields
     Object[][] data = new Object[][] {
-        {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true},
-        {BulkInsertSortMode.PARTITION_SORT, true, true, false, true},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, 
false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, 
false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, 
false, false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, 
false, false},
-        {BulkInsertSortMode.NONE, true, true, false, false},
-        {BulkInsertSortMode.NONE, true, false, false, false}
+        {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, true},
+        {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, 
false, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, 
false, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, 
false, false, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, 
false, false, true},
+        {BulkInsertSortMode.NONE, true, true, false, false, true},
+        {BulkInsertSortMode.NONE, true, false, false, false, true},
+        {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, false},
+        {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, 
false, false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, 
false, false, false}
     };
     return Stream.of(data).map(Arguments::of);
   }
@@ -109,7 +116,8 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
                                                  boolean 
enforceNumOutputPartitions,
                                                  boolean isGloballySorted,
                                                  boolean isLocallySorted,
-                                                 Map<String, Long> 
expectedPartitionNumRecords) {
+                                                 Map<String, Long> 
expectedPartitionNumRecords,
+                                                 boolean populateMetaFields) {
     testBulkInsertInternalPartitioner(
         partitioner,
         records,
@@ -117,7 +125,8 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
         isGloballySorted,
         isLocallySorted,
         expectedPartitionNumRecords,
-        Option.empty());
+        Option.empty(),
+        populateMetaFields);
   }
 
   private void testBulkInsertInternalPartitioner(BulkInsertPartitioner 
partitioner,
@@ -126,8 +135,13 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
                                                  boolean isGloballySorted,
                                                  boolean isLocallySorted,
                                                  Map<String, Long> 
expectedPartitionNumRecords,
-                                                 
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) {
+                                                 
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator,
+                                                 boolean populateMetaFields) {
     int numPartitions = 2;
+    if (!populateMetaFields) {
+      assertThrows(HoodieException.class, () -> 
partitioner.repartitionRecords(records, numPartitions));
+      return;
+    }
     JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords =
         (JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>) 
partitioner.repartitionRecords(records, numPartitions);
     assertEquals(
@@ -163,25 +177,35 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
                                                 boolean isTablePartitioned,
                                                 boolean 
enforceNumOutputPartitions,
                                                 boolean isGloballySorted,
-                                                boolean isLocallySorted) {
+                                                boolean isLocallySorted,
+                                                boolean populateMetaFields) {
     JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
     JavaRDD<HoodieRecord> records2 = 
generateTripleTestRecordsForBulkInsert(jsc);
+
+    HoodieWriteConfig config = HoodieWriteConfig
+        .newBuilder()
+        .withPath("/")
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withBulkInsertSortMode(sortMode.name())
+        .withPopulateMetaFields(populateMetaFields)
+        .build();
+
     testBulkInsertInternalPartitioner(
-        BulkInsertInternalPartitionerFactory.get(
-            sortMode, isTablePartitioned, enforceNumOutputPartitions),
+        BulkInsertInternalPartitionerFactory.get(config, isTablePartitioned, 
enforceNumOutputPartitions),
         records1,
         enforceNumOutputPartitions,
         isGloballySorted,
         isLocallySorted,
-        generateExpectedPartitionNumRecords(records1));
+        generateExpectedPartitionNumRecords(records1),
+        populateMetaFields);
     testBulkInsertInternalPartitioner(
-        BulkInsertInternalPartitionerFactory.get(
-            sortMode, isTablePartitioned, enforceNumOutputPartitions),
+        BulkInsertInternalPartitionerFactory.get(config, isTablePartitioned, 
enforceNumOutputPartitions),
         records2,
         enforceNumOutputPartitions,
         isGloballySorted,
         isLocallySorted,
-        generateExpectedPartitionNumRecords(records2));
+        generateExpectedPartitionNumRecords(records2),
+        populateMetaFields);
   }
 
   @Test
@@ -193,22 +217,21 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
     JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
     JavaRDD<HoodieRecord> records2 = 
generateTripleTestRecordsForBulkInsert(jsc);
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, false),
-        records1, true, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
+        records1, true, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator), 
true);
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, false),
-        records2, true, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
+        records2, true, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator), 
true);
 
     HoodieWriteConfig config = HoodieWriteConfig
-            .newBuilder()
-            .withPath("/")
-            .withSchema(TRIP_EXAMPLE_SCHEMA)
-            
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
-            .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
-            .build();
+        .newBuilder()
+        .withPath("/")
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        
.withUserDefinedBulkInsertPartitionerClass(RDDCustomColumnsSortPartitioner.class.getName())
+        .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
+        .build();
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(config),
-        records1, true, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
+        records1, true, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator), 
true);
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(config),
-        records2, true, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
-
+        records2, true, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator), 
true);
   }
 
   private Comparator<HoodieRecord<? extends HoodieRecordPayload>> 
getCustomColumnComparator(Schema schema, String[] sortColumns) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
index de827f7a450..6332e00ba67 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
@@ -22,6 +22,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.SparkDatasetTestUtils;
@@ -44,7 +45,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
  * Unit tests {@link BulkInsertPartitioner}s with Rows.
@@ -53,6 +56,7 @@ public class TestBulkInsertInternalPartitionerForRows extends 
HoodieClientTestHa
 
   private static final Comparator<Row> KEY_COMPARATOR =
       Comparator.comparing(o -> 
(o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + 
o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)));
+
   @BeforeEach
   public void setUp() throws Exception {
     initSparkContexts("TestBulkInsertInternalPartitionerForRows");
@@ -71,16 +75,21 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
     //   boolean isTablePartitioned,
     //   boolean enforceNumOutputPartitions,
     //   boolean isGloballySorted,
-    //   boolean isLocallySorted
+    //   boolean isLocallySorted,
+    //   boolean populateMetaFields
     Object[][] data = new Object[][] {
-        {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true},
-        {BulkInsertSortMode.PARTITION_SORT, true, true, false, true},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, 
false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, 
false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, 
false, false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, 
false, false},
-        {BulkInsertSortMode.NONE, true, true, false, false},
-        {BulkInsertSortMode.NONE, true, false, false, false}
+        {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, true},
+        {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, 
false, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, 
false, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, 
false, false, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, 
false, false, true},
+        {BulkInsertSortMode.NONE, true, true, false, false, true},
+        {BulkInsertSortMode.NONE, true, false, false, false, true},
+        {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true, false},
+        {BulkInsertSortMode.PARTITION_SORT, true, true, false, true, false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, 
false, false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, 
false, false, false, false},
     };
     return Stream.of(data).map(Arguments::of);
   }
@@ -91,28 +100,37 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
                                                 boolean isTablePartitioned,
                                                 boolean 
enforceNumOutputPartitions,
                                                 boolean isGloballySorted,
-                                                boolean isLocallySorted)
-      throws Exception {
+                                                boolean isLocallySorted,
+                                                boolean populateMetaFields) {
     Dataset<Row> records1 = generateTestRecords();
     Dataset<Row> records2 = generateTestRecords();
+
+    HoodieWriteConfig config = HoodieWriteConfig
+        .newBuilder()
+        .withPath("/")
+        .withSchema(TRIP_EXAMPLE_SCHEMA)
+        .withBulkInsertSortMode(sortMode.name())
+        .withPopulateMetaFields(populateMetaFields)
+        .build();
+
     testBulkInsertInternalPartitioner(
-        BulkInsertInternalPartitionerWithRowsFactory.get(
-            sortMode, isTablePartitioned, enforceNumOutputPartitions),
+        BulkInsertInternalPartitionerWithRowsFactory.get(config, 
isTablePartitioned, enforceNumOutputPartitions),
         records1,
         enforceNumOutputPartitions,
         isGloballySorted,
         isLocallySorted,
         generateExpectedPartitionNumRecords(records1),
-        Option.empty());
+        Option.empty(),
+        populateMetaFields);
     testBulkInsertInternalPartitioner(
-        BulkInsertInternalPartitionerWithRowsFactory.get(
-            sortMode, isTablePartitioned, enforceNumOutputPartitions),
+        BulkInsertInternalPartitionerWithRowsFactory.get(config, 
isTablePartitioned, enforceNumOutputPartitions),
         records2,
         enforceNumOutputPartitions,
         isGloballySorted,
         isLocallySorted,
         generateExpectedPartitionNumRecords(records2),
-        Option.empty());
+        Option.empty(),
+        populateMetaFields);
   }
 
   @Test
@@ -124,9 +142,9 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
     Comparator<Row> comparator = getCustomColumnComparator(sortColumns);
 
     testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(sortColumns),
-        records1, true, false, true, 
generateExpectedPartitionNumRecords(records1), Option.of(comparator));
+        records1, true, false, true, 
generateExpectedPartitionNumRecords(records1), Option.of(comparator), true);
     testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(sortColumns),
-        records2, true, false, true, 
generateExpectedPartitionNumRecords(records2), Option.of(comparator));
+        records2, true, false, true, 
generateExpectedPartitionNumRecords(records2), Option.of(comparator), true);
 
     HoodieWriteConfig config = HoodieWriteConfig
         .newBuilder()
@@ -135,9 +153,9 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
         .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
         .build();
     testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(config),
-        records1, true, false, true, 
generateExpectedPartitionNumRecords(records1), Option.of(comparator));
+        records1, true, false, true, 
generateExpectedPartitionNumRecords(records1), Option.of(comparator), true);
     testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(config),
-        records2, true, false, true, 
generateExpectedPartitionNumRecords(records2), Option.of(comparator));
+        records2, true, false, true, 
generateExpectedPartitionNumRecords(records2), Option.of(comparator), true);
   }
 
   private void testBulkInsertInternalPartitioner(BulkInsertPartitioner 
partitioner,
@@ -146,8 +164,13 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
                                                  boolean isGloballySorted,
                                                  boolean isLocallySorted,
                                                  Map<String, Long> 
expectedPartitionNumRecords,
-                                                 Option<Comparator<Row>> 
comparator) {
+                                                 Option<Comparator<Row>> 
comparator,
+                                                 boolean populateMetaFields) {
     int numPartitions = 2;
+    if (!populateMetaFields) {
+      assertThrows(HoodieException.class, () -> 
partitioner.repartitionRecords(rows, numPartitions));
+      return;
+    }
     Dataset<Row> actualRecords = (Dataset<Row>) 
partitioner.repartitionRecords(rows, numPartitions);
     assertEquals(
         enforceNumOutputPartitions ? numPartitions : 
rows.rdd().getNumPartitions(),
@@ -198,7 +221,7 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
 
   private void verifyRowsAscendingOrder(List<Row> records, 
Option<Comparator<Row>> comparator) {
     List<Row> expectedRecords = new ArrayList<>(records);
-    Collections.sort(expectedRecords,comparator.orElse(KEY_COMPARATOR));
+    Collections.sort(expectedRecords, comparator.orElse(KEY_COMPARATOR));
     assertEquals(expectedRecords, records);
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index f0ede2b2b82..06f85fca022 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -735,8 +735,7 @@ object HoodieSparkSqlWriter {
       if (userDefinedBulkInsertPartitionerOpt.isPresent) {
         userDefinedBulkInsertPartitionerOpt.get
       } else {
-        BulkInsertInternalPartitionerWithRowsFactory.get(
-          writeConfig.getBulkInsertSortMode, isTablePartitioned)
+        BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig, 
isTablePartitioned)
       }
     } else {
       // Sort modes are not yet supported when meta fields are disabled
@@ -981,7 +980,7 @@ object HoodieSparkSqlWriter {
   }
 
   private def mergeParamsAndGetHoodieConfig(optParams: Map[String, String],
-      tableConfig: HoodieTableConfig, mode: SaveMode): (Map[String, String], 
HoodieConfig) = {
+                                            tableConfig: HoodieTableConfig, 
mode: SaveMode): (Map[String, String], HoodieConfig) = {
     val translatedOptions = 
DataSourceWriteOptions.translateSqlOptions(optParams)
     val mergedParams = mutable.Map.empty ++ 
HoodieWriterUtils.parametersWithWriteDefaults(translatedOptions)
     if (!mergedParams.contains(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME.key)


Reply via email to