This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit ca3dbbcfa57e0b6836e9545d98c95b81397120d8
Author: Y Ethan Guo <ethan.guoyi...@gmail.com>
AuthorDate: Fri Dec 9 21:41:48 2022 -0800

    [HUDI-5338] Adjust coalesce behavior within NONE sort mode for bulk insert 
(#7396)
    
    This PR adjusts NONE sort mode for bulk insert so that, by default, 
coalesce is not applied, matching the default parquet write behavior. The NONE 
sort mode still applies coalesce for clustering as the clustering operation 
relies on the bulk insert and the specified number of output Spark partitions 
to write a specific number of files.
---
 .../apache/hudi/table/BulkInsertPartitioner.java   | 18 ++++--
 .../MultipleSparkJobExecutionStrategy.java         |  4 +-
 .../BulkInsertInternalPartitionerFactory.java      | 15 ++++-
 ...lkInsertInternalPartitionerWithRowsFactory.java |  7 ++-
 .../execution/bulkinsert/NonSortPartitioner.java   | 33 +++++++++-
 .../bulkinsert/NonSortPartitionerWithRows.java     | 34 ++++++++--
 .../TestBulkInsertInternalPartitioner.java         | 73 ++++++++++++++++------
 .../TestBulkInsertInternalPartitionerForRows.java  | 69 +++++++++++++-------
 8 files changed, 194 insertions(+), 59 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
index 89360c24740..844e9f4f0f8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java
@@ -20,22 +20,28 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.io.WriteHandleFactory;
 
 import java.io.Serializable;
 
 /**
- * Repartition input records into at least expected number of output 
partitions. It should give below guarantees -
- * Output partition will have records from only one hoodie partition. - 
Average records per output
- * partitions should be almost equal to (#inputRecords / #outputPartitions) to 
avoid possible skews.
+ * Partitions the input records for bulk insert operation.
+ * <p>
+ * The actual implementation of {@link BulkInsertPartitioner} is determined by 
the bulk insert
+ * sort mode, {@link BulkInsertSortMode}, specified by
+ * {@code HoodieWriteConfig.BULK_INSERT_SORT_MODE} 
(`hoodie.bulkinsert.sort.mode`).
  */
 public interface BulkInsertPartitioner<I> extends Serializable {
 
   /**
-   * Repartitions the input records into at least expected number of output 
partitions.
+   * Partitions the input records based on the number of output partitions as 
a hint.
+   * <p>
+   * Note that, the number of output partitions may or may not be enforced, 
depending on the
+   * specific implementation.
    *
-   * @param records          Input Hoodie records
-   * @param outputPartitions Expected number of output partitions
+   * @param records          Input Hoodie records.
+   * @param outputPartitions Expected number of output partitions as a hint.
    * @return
    */
   I repartitionRecords(I records, int outputPartitions);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 15a72079910..03a0c6e36b0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -207,9 +207,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T 
extends HoodieRecordPa
       }
     }).orElse(isRowPartitioner
         ? BulkInsertInternalPartitionerWithRowsFactory.get(
-        getWriteConfig().getBulkInsertSortMode(), 
getHoodieTable().isPartitioned())
+        getWriteConfig().getBulkInsertSortMode(), 
getHoodieTable().isPartitioned(), true)
         : BulkInsertInternalPartitionerFactory.get(
-        getWriteConfig().getBulkInsertSortMode(), 
getHoodieTable().isPartitioned()));
+        getWriteConfig().getBulkInsertSortMode(), 
getHoodieTable().isPartitioned(), true));
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
index 84bd79e3a22..900d2729f10 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java
@@ -30,13 +30,24 @@ import org.apache.hudi.table.HoodieTable;
 public abstract class BulkInsertInternalPartitionerFactory {
 
   public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig 
config) {
-    return get(config.getBulkInsertSortMode(), table.isPartitioned());
+    return get(config.getBulkInsertSortMode(), table.isPartitioned(), false);
+  }
+
+  public static BulkInsertPartitioner get(
+      HoodieTable table, HoodieWriteConfig config, boolean 
enforceNumOutputPartitions) {
+    return get(config.getBulkInsertSortMode(), table.isPartitioned(), 
enforceNumOutputPartitions);
   }
 
   public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean 
isTablePartitioned) {
+    return get(sortMode, isTablePartitioned, false);
+  }
+
+  public static BulkInsertPartitioner get(BulkInsertSortMode sortMode,
+                                          boolean isTablePartitioned,
+                                          boolean enforceNumOutputPartitions) {
     switch (sortMode) {
       case NONE:
-        return new NonSortPartitioner();
+        return new NonSortPartitioner(enforceNumOutputPartitions);
       case GLOBAL_SORT:
         return new GlobalSortPartitioner();
       case PARTITION_SORT:
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
index 720c03948d4..218eae0dc94 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java
@@ -31,9 +31,14 @@ public abstract class 
BulkInsertInternalPartitionerWithRowsFactory {
 
   public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode 
sortMode,
                                                         boolean 
isTablePartitioned) {
+    return get(sortMode, isTablePartitioned, false);
+  }
+
+  public static BulkInsertPartitioner<Dataset<Row>> get(
+      BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean 
enforceNumOutputPartitions) {
     switch (sortMode) {
       case NONE:
-        return new NonSortPartitionerWithRows();
+        return new NonSortPartitionerWithRows(enforceNumOutputPartitions);
       case GLOBAL_SORT:
         return new GlobalSortPartitionerWithRows();
       case PARTITION_SORT:
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java
index 19c90ecb1a0..67cd599731c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java
@@ -25,18 +25,45 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.spark.api.java.JavaRDD;
 
 /**
- * A built-in partitioner that only does coalesce for input records for bulk 
insert operation,
- * corresponding to the {@code BulkInsertSortMode.NONE} mode.
+ * A built-in partitioner that avoids expensive sorting for the input records 
for bulk insert
+ * operation, by doing either of the following:
+ * <p>
+ * - If enforcing the outputSparkPartitions, only does coalesce for input 
records;
+ * <p>
+ * - Otherwise, returns input records as is.
+ * <p>
+ * Corresponds to the {@code BulkInsertSortMode.NONE} mode.
  *
  * @param <T> HoodieRecordPayload type
  */
 public class NonSortPartitioner<T extends HoodieRecordPayload>
     implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {
 
+  private final boolean enforceNumOutputPartitions;
+
+  /**
+   * Default constructor without enforcing the number of output partitions.
+   */
+  public NonSortPartitioner() {
+    this(false);
+  }
+
+  /**
+   * Constructor with `enforceNumOutputPartitions` config.
+   *
+   * @param enforceNumOutputPartitions Whether to enforce the number of output 
partitions.
+   */
+  public NonSortPartitioner(boolean enforceNumOutputPartitions) {
+    this.enforceNumOutputPartitions = enforceNumOutputPartitions;
+  }
+
   @Override
   public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> 
records,
                                                      int 
outputSparkPartitions) {
-    return records.coalesce(outputSparkPartitions);
+    if (enforceNumOutputPartitions) {
+      return records.coalesce(outputSparkPartitions);
+    }
+    return records;
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java
index e1c34a8f840..10ec275064f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java
@@ -24,15 +24,41 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
 /**
- * A built-in partitioner that only does coalesce for input Rows for bulk 
insert operation,
- * corresponding to the {@code BulkInsertSortMode.NONE} mode.
- *
+ * A built-in partitioner that avoids expensive sorting for the input Rows for 
bulk insert
+ * operation, by doing either of the following:
+ * <p>
+ * - If enforcing the outputSparkPartitions, only does coalesce for input Rows;
+ * <p>
+ * - Otherwise, returns input Rows as is.
+ * <p>
+ * Corresponds to the {@code BulkInsertSortMode.NONE} mode.
  */
 public class NonSortPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
 
+  private final boolean enforceNumOutputPartitions;
+
+  /**
+   * Default constructor without enforcing the number of output partitions.
+   */
+  public NonSortPartitionerWithRows() {
+    this(false);
+  }
+
+  /**
+   * Constructor with `enforceNumOutputPartitions` config.
+   *
+   * @param enforceNumOutputPartitions Whether to enforce the number of output 
partitions.
+   */
+  public NonSortPartitionerWithRows(boolean enforceNumOutputPartitions) {
+    this.enforceNumOutputPartitions = enforceNumOutputPartitions;
+  }
+
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputSparkPartitions) {
-    return rows.coalesce(outputSparkPartitions);
+    if (enforceNumOutputPartitions) {
+      return rows.coalesce(outputSparkPartitions);
+    }
+    return rows;
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
index d04059e59f9..7bc64b54457 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java
@@ -78,14 +78,21 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
   }
 
   private static Stream<Arguments> configParams() {
+    // Parameters:
+    //   BulkInsertSortMode sortMode,
+    //   boolean isTablePartitioned,
+    //   boolean enforceNumOutputPartitions,
+    //   boolean isGloballySorted,
+    //   boolean isLocallySorted
     Object[][] data = new Object[][] {
-        {BulkInsertSortMode.GLOBAL_SORT, true, true, true},
-        {BulkInsertSortMode.PARTITION_SORT, true, false, true},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, 
false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, 
false},
-        {BulkInsertSortMode.NONE, true, false, false}
+        {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true},
+        {BulkInsertSortMode.PARTITION_SORT, true, true, false, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, 
false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, 
false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, 
false, false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, 
false, false},
+        {BulkInsertSortMode.NONE, true, true, false, false},
+        {BulkInsertSortMode.NONE, true, false, false, false}
     };
     return Stream.of(data).map(Arguments::of);
   }
@@ -99,20 +106,33 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
 
   private void testBulkInsertInternalPartitioner(BulkInsertPartitioner 
partitioner,
                                                  JavaRDD<HoodieRecord> records,
-                                                 boolean isGloballySorted, 
boolean isLocallySorted,
+                                                 boolean 
enforceNumOutputPartitions,
+                                                 boolean isGloballySorted,
+                                                 boolean isLocallySorted,
                                                  Map<String, Long> 
expectedPartitionNumRecords) {
-    testBulkInsertInternalPartitioner(partitioner, records, isGloballySorted, 
isLocallySorted, expectedPartitionNumRecords, Option.empty());
+    testBulkInsertInternalPartitioner(
+        partitioner,
+        records,
+        enforceNumOutputPartitions,
+        isGloballySorted,
+        isLocallySorted,
+        expectedPartitionNumRecords,
+        Option.empty());
   }
 
   private void testBulkInsertInternalPartitioner(BulkInsertPartitioner 
partitioner,
                                                  JavaRDD<HoodieRecord> records,
-                                                 boolean isGloballySorted, 
boolean isLocallySorted,
+                                                 boolean 
enforceNumOutputPartitions,
+                                                 boolean isGloballySorted,
+                                                 boolean isLocallySorted,
                                                  Map<String, Long> 
expectedPartitionNumRecords,
                                                  
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) {
     int numPartitions = 2;
     JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords =
         (JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>) 
partitioner.repartitionRecords(records, numPartitions);
-    assertEquals(numPartitions, actualRecords.getNumPartitions());
+    assertEquals(
+        enforceNumOutputPartitions ? numPartitions : 
records.getNumPartitions(),
+        actualRecords.getNumPartitions());
     List<HoodieRecord<? extends HoodieRecordPayload>> collectedActualRecords = 
actualRecords.collect();
     if (isGloballySorted) {
       // Verify global order
@@ -137,18 +157,31 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
     assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords);
   }
 
-  @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}")
+  @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} 
enforceNumOutputPartitions={2}")
   @MethodSource("configParams")
   public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode,
                                                 boolean isTablePartitioned,
+                                                boolean 
enforceNumOutputPartitions,
                                                 boolean isGloballySorted,
                                                 boolean isLocallySorted) {
     JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
     JavaRDD<HoodieRecord> records2 = 
generateTripleTestRecordsForBulkInsert(jsc);
-    
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode,
 isTablePartitioned),
-        records1, isGloballySorted, isLocallySorted, 
generateExpectedPartitionNumRecords(records1));
-    
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode,
 isTablePartitioned),
-        records2, isGloballySorted, isLocallySorted, 
generateExpectedPartitionNumRecords(records2));
+    testBulkInsertInternalPartitioner(
+        BulkInsertInternalPartitionerFactory.get(
+            sortMode, isTablePartitioned, enforceNumOutputPartitions),
+        records1,
+        enforceNumOutputPartitions,
+        isGloballySorted,
+        isLocallySorted,
+        generateExpectedPartitionNumRecords(records1));
+    testBulkInsertInternalPartitioner(
+        BulkInsertInternalPartitionerFactory.get(
+            sortMode, isTablePartitioned, enforceNumOutputPartitions),
+        records2,
+        enforceNumOutputPartitions,
+        isGloballySorted,
+        isLocallySorted,
+        generateExpectedPartitionNumRecords(records2));
   }
 
   @Test
@@ -160,9 +193,9 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
     JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc);
     JavaRDD<HoodieRecord> records2 = 
generateTripleTestRecordsForBulkInsert(jsc);
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, false),
-        records1, true, true, generateExpectedPartitionNumRecords(records1), 
Option.of(columnComparator));
+        records1, true, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(sortColumns, 
HoodieTestDataGenerator.AVRO_SCHEMA, false),
-        records2, true, true, generateExpectedPartitionNumRecords(records2), 
Option.of(columnComparator));
+        records2, true, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
 
     HoodieWriteConfig config = HoodieWriteConfig
             .newBuilder()
@@ -172,9 +205,9 @@ public class TestBulkInsertInternalPartitioner extends 
HoodieClientTestBase impl
             .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
             .build();
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(config),
-            records1, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
+        records1, true, true, true, 
generateExpectedPartitionNumRecords(records1), Option.of(columnComparator));
     testBulkInsertInternalPartitioner(new 
RDDCustomColumnsSortPartitioner(config),
-            records2, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
+        records2, true, true, true, 
generateExpectedPartitionNumRecords(records2), Option.of(columnComparator));
 
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
index 6223c8bb9eb..de827f7a450 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
@@ -66,32 +66,53 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
   }
 
   private static Stream<Arguments> configParams() {
+    // Parameters:
+    //   BulkInsertSortMode sortMode,
+    //   boolean isTablePartitioned,
+    //   boolean enforceNumOutputPartitions,
+    //   boolean isGloballySorted,
+    //   boolean isLocallySorted
     Object[][] data = new Object[][] {
-        {BulkInsertSortMode.GLOBAL_SORT, true, true, true},
-        {BulkInsertSortMode.PARTITION_SORT, true, false, true},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, 
false},
-        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, 
false},
-
-        {BulkInsertSortMode.NONE, true, false, false}
+        {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true},
+        {BulkInsertSortMode.PARTITION_SORT, true, true, false, true},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, 
false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, 
false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, 
false, false},
+        {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, 
false, false},
+        {BulkInsertSortMode.NONE, true, true, false, false},
+        {BulkInsertSortMode.NONE, true, false, false, false}
     };
     return Stream.of(data).map(Arguments::of);
   }
 
-  @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}")
+  @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} 
enforceNumOutputPartitions={2}")
   @MethodSource("configParams")
   public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode,
                                                 boolean isTablePartitioned,
+                                                boolean 
enforceNumOutputPartitions,
                                                 boolean isGloballySorted,
                                                 boolean isLocallySorted)
       throws Exception {
     Dataset<Row> records1 = generateTestRecords();
     Dataset<Row> records2 = generateTestRecords();
-    
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode,
 isTablePartitioned),
-        records1, isGloballySorted, isLocallySorted, 
generateExpectedPartitionNumRecords(records1), Option.empty());
-    
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode,
 isTablePartitioned),
-        records2, isGloballySorted, isLocallySorted, 
generateExpectedPartitionNumRecords(records2), Option.empty());
+    testBulkInsertInternalPartitioner(
+        BulkInsertInternalPartitionerWithRowsFactory.get(
+            sortMode, isTablePartitioned, enforceNumOutputPartitions),
+        records1,
+        enforceNumOutputPartitions,
+        isGloballySorted,
+        isLocallySorted,
+        generateExpectedPartitionNumRecords(records1),
+        Option.empty());
+    testBulkInsertInternalPartitioner(
+        BulkInsertInternalPartitionerWithRowsFactory.get(
+            sortMode, isTablePartitioned, enforceNumOutputPartitions),
+        records2,
+        enforceNumOutputPartitions,
+        isGloballySorted,
+        isLocallySorted,
+        generateExpectedPartitionNumRecords(records2),
+        Option.empty());
   }
 
   @Test
@@ -103,9 +124,9 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
     Comparator<Row> comparator = getCustomColumnComparator(sortColumns);
 
     testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(sortColumns),
-        records1, false, true, generateExpectedPartitionNumRecords(records1), 
Option.of(comparator));
+        records1, true, false, true, 
generateExpectedPartitionNumRecords(records1), Option.of(comparator));
     testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(sortColumns),
-        records2, false, true, generateExpectedPartitionNumRecords(records2), 
Option.of(comparator));
+        records2, true, false, true, 
generateExpectedPartitionNumRecords(records2), Option.of(comparator));
 
     HoodieWriteConfig config = HoodieWriteConfig
         .newBuilder()
@@ -114,18 +135,24 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
         .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
         .build();
     testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(config),
-        records1, false, true, generateExpectedPartitionNumRecords(records1), 
Option.of(comparator));
+        records1, true, false, true, 
generateExpectedPartitionNumRecords(records1), Option.of(comparator));
     testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(config),
-        records2, false, true, generateExpectedPartitionNumRecords(records2), 
Option.of(comparator));
+        records2, true, false, true, 
generateExpectedPartitionNumRecords(records2), Option.of(comparator));
   }
 
   private void testBulkInsertInternalPartitioner(BulkInsertPartitioner 
partitioner,
-      Dataset<Row> rows,
-      boolean isGloballySorted, boolean isLocallySorted,
-      Map<String, Long> expectedPartitionNumRecords,
-      Option<Comparator<Row>> comparator) {
+                                                 Dataset<Row> rows,
+                                                 boolean 
enforceNumOutputPartitions,
+                                                 boolean isGloballySorted,
+                                                 boolean isLocallySorted,
+                                                 Map<String, Long> 
expectedPartitionNumRecords,
+                                                 Option<Comparator<Row>> 
comparator) {
     int numPartitions = 2;
     Dataset<Row> actualRecords = (Dataset<Row>) 
partitioner.repartitionRecords(rows, numPartitions);
+    assertEquals(
+        enforceNumOutputPartitions ? numPartitions : 
rows.rdd().getNumPartitions(),
+        actualRecords.rdd().getNumPartitions());
+
     List<Row> collectedActualRecords = actualRecords.collectAsList();
     if (isGloballySorted) {
       // Verify global order

Reply via email to