This is an automated email from the ASF dual-hosted git repository. sivabalan pushed a commit to branch release-0.12.2-shadow in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ca3dbbcfa57e0b6836e9545d98c95b81397120d8 Author: Y Ethan Guo <ethan.guoyi...@gmail.com> AuthorDate: Fri Dec 9 21:41:48 2022 -0800 [HUDI-5338] Adjust coalesce behavior within NONE sort mode for bulk insert (#7396) This PR adjusts NONE sort mode for bulk insert so that, by default, coalesce is not applied, matching the default parquet write behavior. The NONE sort mode still applies coalesce for clustering as the clustering operation relies on the bulk insert and the specified number of output Spark partitions to write a specific number of files. --- .../apache/hudi/table/BulkInsertPartitioner.java | 18 ++++-- .../MultipleSparkJobExecutionStrategy.java | 4 +- .../BulkInsertInternalPartitionerFactory.java | 15 ++++- ...lkInsertInternalPartitionerWithRowsFactory.java | 7 ++- .../execution/bulkinsert/NonSortPartitioner.java | 33 +++++++++- .../bulkinsert/NonSortPartitionerWithRows.java | 34 ++++++++-- .../TestBulkInsertInternalPartitioner.java | 73 ++++++++++++++++------ .../TestBulkInsertInternalPartitionerForRows.java | 69 +++++++++++++------- 8 files changed, 194 insertions(+), 59 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java index 89360c24740..844e9f4f0f8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/BulkInsertPartitioner.java @@ -20,22 +20,28 @@ package org.apache.hudi.table; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.io.WriteHandleFactory; import java.io.Serializable; /** - * Repartition input records into at least expected number of output partitions. It should give below guarantees - - * Output partition will have records from only one hoodie partition. - Average records per output - * partitions should be almost equal to (#inputRecords / #outputPartitions) to avoid possible skews. + * Partitions the input records for bulk insert operation. + * <p> + * The actual implementation of {@link BulkInsertPartitioner} is determined by the bulk insert + * sort mode, {@link BulkInsertSortMode}, specified by + * {@code HoodieWriteConfig.BULK_INSERT_SORT_MODE} (`hoodie.bulkinsert.sort.mode`). */ public interface BulkInsertPartitioner<I> extends Serializable { /** - * Repartitions the input records into at least expected number of output partitions. + * Partitions the input records based on the number of output partitions as a hint. + * <p> + * Note that, the number of output partitions may or may not be enforced, depending on the + * specific implementation. * - * @param records Input Hoodie records - * @param outputPartitions Expected number of output partitions + * @param records Input Hoodie records. + * @param outputPartitions Expected number of output partitions as a hint. * @return */ I repartitionRecords(I records, int outputPartitions); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index 15a72079910..03a0c6e36b0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -207,9 +207,9 @@ public abstract class MultipleSparkJobExecutionStrategy<T extends HoodieRecordPa } }).orElse(isRowPartitioner ? BulkInsertInternalPartitionerWithRowsFactory.get( - getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned()) + getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true) : BulkInsertInternalPartitionerFactory.get( - getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned())); + getWriteConfig().getBulkInsertSortMode(), getHoodieTable().isPartitioned(), true)); } /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java index 84bd79e3a22..900d2729f10 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerFactory.java @@ -30,13 +30,24 @@ import org.apache.hudi.table.HoodieTable; public abstract class BulkInsertInternalPartitionerFactory { public static BulkInsertPartitioner get(HoodieTable table, HoodieWriteConfig config) { - return get(config.getBulkInsertSortMode(), table.isPartitioned()); + return get(config.getBulkInsertSortMode(), table.isPartitioned(), false); + } + + public static BulkInsertPartitioner get( + HoodieTable table, HoodieWriteConfig config, boolean enforceNumOutputPartitions) { + return get(config.getBulkInsertSortMode(), table.isPartitioned(), enforceNumOutputPartitions); } public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, boolean isTablePartitioned) { + return get(sortMode, isTablePartitioned, false); + } + + public static BulkInsertPartitioner get(BulkInsertSortMode sortMode, + boolean isTablePartitioned, + boolean enforceNumOutputPartitions) { switch (sortMode) { case NONE: - return new NonSortPartitioner(); + return new NonSortPartitioner(enforceNumOutputPartitions); case GLOBAL_SORT: return new GlobalSortPartitioner(); case PARTITION_SORT: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java index 720c03948d4..218eae0dc94 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BulkInsertInternalPartitionerWithRowsFactory.java @@ -31,9 +31,14 @@ public abstract class BulkInsertInternalPartitionerWithRowsFactory { public static BulkInsertPartitioner<Dataset<Row>> get(BulkInsertSortMode sortMode, boolean isTablePartitioned) { + return get(sortMode, isTablePartitioned, false); + } + + public static BulkInsertPartitioner<Dataset<Row>> get( + BulkInsertSortMode sortMode, boolean isTablePartitioned, boolean enforceNumOutputPartitions) { switch (sortMode) { case NONE: - return new NonSortPartitionerWithRows(); + return new NonSortPartitionerWithRows(enforceNumOutputPartitions); case GLOBAL_SORT: return new GlobalSortPartitionerWithRows(); case PARTITION_SORT: diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java index 19c90ecb1a0..67cd599731c 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitioner.java @@ -25,18 +25,45 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.spark.api.java.JavaRDD; /** - * A built-in partitioner that only does coalesce for input records for bulk insert operation, - * corresponding to the {@code BulkInsertSortMode.NONE} mode. + * A built-in partitioner that avoids expensive sorting for the input records for bulk insert + * operation, by doing either of the following: + * <p> + * - If enforcing the outputSparkPartitions, only does coalesce for input records; + * <p> + * - Otherwise, returns input records as is. + * <p> + * Corresponds to the {@code BulkInsertSortMode.NONE} mode. * * @param <T> HoodieRecordPayload type */ public class NonSortPartitioner<T extends HoodieRecordPayload> implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> { + private final boolean enforceNumOutputPartitions; + + /** + * Default constructor without enforcing the number of output partitions. + */ + public NonSortPartitioner() { + this(false); + } + + /** + * Constructor with `enforceNumOutputPartitions` config. + * + * @param enforceNumOutputPartitions Whether to enforce the number of output partitions. + */ + public NonSortPartitioner(boolean enforceNumOutputPartitions) { + this.enforceNumOutputPartitions = enforceNumOutputPartitions; + } + @Override public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) { - return records.coalesce(outputSparkPartitions); + if (enforceNumOutputPartitions) { + return records.coalesce(outputSparkPartitions); + } + return records; } @Override diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java index e1c34a8f840..10ec275064f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/NonSortPartitionerWithRows.java @@ -24,15 +24,41 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; /** - * A built-in partitioner that only does coalesce for input Rows for bulk insert operation, - * corresponding to the {@code BulkInsertSortMode.NONE} mode. - * + * A built-in partitioner that avoids expensive sorting for the input Rows for bulk insert + * operation, by doing either of the following: + * <p> + * - If enforcing the outputSparkPartitions, only does coalesce for input Rows; + * <p> + * - Otherwise, returns input Rows as is. + * <p> + * Corresponds to the {@code BulkInsertSortMode.NONE} mode. */ public class NonSortPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> { + private final boolean enforceNumOutputPartitions; + + /** + * Default constructor without enforcing the number of output partitions. + */ + public NonSortPartitionerWithRows() { + this(false); + } + + /** + * Constructor with `enforceNumOutputPartitions` config. + * + * @param enforceNumOutputPartitions Whether to enforce the number of output partitions. + */ + public NonSortPartitionerWithRows(boolean enforceNumOutputPartitions) { + this.enforceNumOutputPartitions = enforceNumOutputPartitions; + } + @Override public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputSparkPartitions) { - return rows.coalesce(outputSparkPartitions); + if (enforceNumOutputPartitions) { + return rows.coalesce(outputSparkPartitions); + } + return rows; } @Override diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java index d04059e59f9..7bc64b54457 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java @@ -78,14 +78,21 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl } private static Stream<Arguments> configParams() { + // Parameters: + // BulkInsertSortMode sortMode, + // boolean isTablePartitioned, + // boolean enforceNumOutputPartitions, + // boolean isGloballySorted, + // boolean isLocallySorted Object[][] data = new Object[][] { - {BulkInsertSortMode.GLOBAL_SORT, true, true, true}, - {BulkInsertSortMode.PARTITION_SORT, true, false, true}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, false}, - {BulkInsertSortMode.NONE, true, false, false} + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false}, + {BulkInsertSortMode.NONE, true, true, false, false}, + {BulkInsertSortMode.NONE, true, false, false, false} }; return Stream.of(data).map(Arguments::of); } @@ -99,20 +106,33 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, JavaRDD<HoodieRecord> records, - boolean isGloballySorted, boolean isLocallySorted, + boolean enforceNumOutputPartitions, + boolean isGloballySorted, + boolean isLocallySorted, Map<String, Long> expectedPartitionNumRecords) { - testBulkInsertInternalPartitioner(partitioner, records, isGloballySorted, isLocallySorted, expectedPartitionNumRecords, Option.empty()); + testBulkInsertInternalPartitioner( + partitioner, + records, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + expectedPartitionNumRecords, + Option.empty()); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, JavaRDD<HoodieRecord> records, - boolean isGloballySorted, boolean isLocallySorted, + boolean enforceNumOutputPartitions, + boolean isGloballySorted, + boolean isLocallySorted, Map<String, Long> expectedPartitionNumRecords, Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) { int numPartitions = 2; JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords = (JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>) partitioner.repartitionRecords(records, numPartitions); - assertEquals(numPartitions, actualRecords.getNumPartitions()); + assertEquals( + enforceNumOutputPartitions ? numPartitions : records.getNumPartitions(), + actualRecords.getNumPartitions()); List<HoodieRecord<? extends HoodieRecordPayload>> collectedActualRecords = actualRecords.collect(); if (isGloballySorted) { // Verify global order @@ -137,18 +157,31 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl assertEquals(expectedPartitionNumRecords, actualPartitionNumRecords); } - @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}") + @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}") @MethodSource("configParams") public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, boolean isTablePartitioned, + boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted) { JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned), - records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1)); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerFactory.get(sortMode, isTablePartitioned), - records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2)); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records1, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records1)); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records2, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records2)); } @Test @@ -160,9 +193,9 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl JavaRDD<HoodieRecord> records1 = generateTestRecordsForBulkInsert(jsc); JavaRDD<HoodieRecord> records2 = generateTripleTestRecordsForBulkInsert(jsc); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), - records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(sortColumns, HoodieTestDataGenerator.AVRO_SCHEMA, false), - records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); HoodieWriteConfig config = HoodieWriteConfig .newBuilder() @@ -172,9 +205,9 @@ public class TestBulkInsertInternalPartitioner extends HoodieClientTestBase impl .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) .build(); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), - records1, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); + records1, true, true, true, generateExpectedPartitionNumRecords(records1), Option.of(columnComparator)); testBulkInsertInternalPartitioner(new RDDCustomColumnsSortPartitioner(config), - records2, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); + records2, true, true, true, generateExpectedPartitionNumRecords(records2), Option.of(columnComparator)); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java index 6223c8bb9eb..de827f7a450 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java @@ -66,32 +66,53 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa } private static Stream<Arguments> configParams() { + // Parameters: + // BulkInsertSortMode sortMode, + // boolean isTablePartitioned, + // boolean enforceNumOutputPartitions, + // boolean isGloballySorted, + // boolean isLocallySorted Object[][] data = new Object[][] { - {BulkInsertSortMode.GLOBAL_SORT, true, true, true}, - {BulkInsertSortMode.PARTITION_SORT, true, false, true}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, false, false}, - {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, false, false}, - - {BulkInsertSortMode.NONE, true, false, false} + {BulkInsertSortMode.GLOBAL_SORT, true, true, true, true}, + {BulkInsertSortMode.PARTITION_SORT, true, true, false, true}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION, false, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, true, true, false, false}, + {BulkInsertSortMode.PARTITION_PATH_REPARTITION_AND_SORT, false, true, false, false}, + {BulkInsertSortMode.NONE, true, true, false, false}, + {BulkInsertSortMode.NONE, true, false, false, false} }; return Stream.of(data).map(Arguments::of); } - @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1}") + @ParameterizedTest(name = "[{index}] {0} isTablePartitioned={1} enforceNumOutputPartitions={2}") @MethodSource("configParams") public void testBulkInsertInternalPartitioner(BulkInsertSortMode sortMode, boolean isTablePartitioned, + boolean enforceNumOutputPartitions, boolean isGloballySorted, boolean isLocallySorted) throws Exception { Dataset<Row> records1 = generateTestRecords(); Dataset<Row> records2 = generateTestRecords(); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode, isTablePartitioned), - records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1), Option.empty()); - testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode, isTablePartitioned), - records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2), Option.empty()); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerWithRowsFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records1, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records1), + Option.empty()); + testBulkInsertInternalPartitioner( + BulkInsertInternalPartitionerWithRowsFactory.get( + sortMode, isTablePartitioned, enforceNumOutputPartitions), + records2, + enforceNumOutputPartitions, + isGloballySorted, + isLocallySorted, + generateExpectedPartitionNumRecords(records2), + Option.empty()); } @Test @@ -103,9 +124,9 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa Comparator<Row> comparator = getCustomColumnComparator(sortColumns); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), - records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); + records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); HoodieWriteConfig config = HoodieWriteConfig .newBuilder() @@ -114,18 +135,24 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) .build(); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + records1, true, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), - records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); + records2, true, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, - Dataset<Row> rows, - boolean isGloballySorted, boolean isLocallySorted, - Map<String, Long> expectedPartitionNumRecords, - Option<Comparator<Row>> comparator) { + Dataset<Row> rows, + boolean enforceNumOutputPartitions, + boolean isGloballySorted, + boolean isLocallySorted, + Map<String, Long> expectedPartitionNumRecords, + Option<Comparator<Row>> comparator) { int numPartitions = 2; Dataset<Row> actualRecords = (Dataset<Row>) partitioner.repartitionRecords(rows, numPartitions); + assertEquals( + enforceNumOutputPartitions ? numPartitions : rows.rdd().getNumPartitions(), + actualRecords.rdd().getNumPartitions()); + List<Row> collectedActualRecords = actualRecords.collectAsList(); if (isGloballySorted) { // Verify global order