alexeykudinkin commented on code in PR #7411: URL: https://github.com/apache/hudi/pull/7411#discussion_r1046110926
########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java: ########## @@ -126,35 +134,40 @@ private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner boolean isGloballySorted, boolean isLocallySorted, Map<String, Long> expectedPartitionNumRecords, - Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) { + Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator, + boolean populateMetaFields) { int numPartitions = 2; - JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords = - (JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>) partitioner.repartitionRecords(records, numPartitions); - assertEquals( - enforceNumOutputPartitions ? numPartitions : records.getNumPartitions(), - actualRecords.getNumPartitions()); - List<HoodieRecord<? extends HoodieRecordPayload>> collectedActualRecords = actualRecords.collect(); - if (isGloballySorted) { - // Verify global order - verifyRecordAscendingOrder(collectedActualRecords, comparator); - } else if (isLocallySorted) { - // Verify local order - actualRecords.mapPartitions(partition -> { - List<HoodieRecord<? extends HoodieRecordPayload>> partitionRecords = new ArrayList<>(); - partition.forEachRemaining(partitionRecords::add); - verifyRecordAscendingOrder(partitionRecords, comparator); - return Collections.emptyList().iterator(); - }).collect(); - } + if (!populateMetaFields) { + assertThrows(IllegalStateException.class, () -> partitioner.repartitionRecords(records, numPartitions)); Review Comment: We can just return in here, there there would be no need to shift the block below ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java: ########## @@ -25,13 +25,22 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.functions; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + /** * A built-in partitioner that does global sorting for the input Rows across partitions after repartition for bulk insert operation, corresponding to the {@code BulkInsertSortMode.GLOBAL_SORT} mode. */ public class GlobalSortPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> { + private final boolean populateMetaFields; + + public GlobalSortPartitionerWithRows(boolean populateMetaFields) { + this.populateMetaFields = populateMetaFields; + } + @Override public Dataset<Row> repartitionRecords(Dataset<Row> rows, int outputSparkPartitions) { + checkState(populateMetaFields, "Meta fields are disabled!"); Review Comment: This exception is aimed at the user and should convey the full message for user to understand the misconfiguration. Let's refine it to be a little more hinting, like for ex: "Global Sort mode requires meta-fields to be enabled" (and also throw HoodieException) ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java: ########## @@ -146,37 +155,42 @@ private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner boolean isGloballySorted, boolean isLocallySorted, Map<String, Long> expectedPartitionNumRecords, - Option<Comparator<Row>> comparator) { + Option<Comparator<Row>> comparator, + boolean populateMetaFields) { int numPartitions = 2; - Dataset<Row> actualRecords = (Dataset<Row>) partitioner.repartitionRecords(rows, numPartitions); - assertEquals( - enforceNumOutputPartitions ? numPartitions : rows.rdd().getNumPartitions(), - actualRecords.rdd().getNumPartitions()); - - List<Row> collectedActualRecords = actualRecords.collectAsList(); - if (isGloballySorted) { - // Verify global order - verifyRowsAscendingOrder(collectedActualRecords, comparator); - } else if (isLocallySorted) { - // Verify local order - actualRecords.mapPartitions((MapPartitionsFunction<Row, Object>) input -> { - List<Row> partitionRows = new ArrayList<>(); - while (input.hasNext()) { - partitionRows.add(input.next()); - } - verifyRowsAscendingOrder(partitionRows, comparator); - return Collections.emptyList().iterator(); - }, SparkDatasetTestUtils.ENCODER); - } + if (!populateMetaFields) { + assertThrows(IllegalStateException.class, () -> partitioner.repartitionRecords(rows, numPartitions)); Review Comment: Same as above ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java: ########## @@ -25,13 +25,22 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.functions; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + /** * A built-in partitioner that does global sorting for the input Rows across partitions after repartition for bulk insert operation, corresponding to the {@code BulkInsertSortMode.GLOBAL_SORT} mode. */ public class GlobalSortPartitionerWithRows implements BulkInsertPartitioner<Dataset<Row>> { + private final boolean populateMetaFields; + + public GlobalSortPartitionerWithRows(boolean populateMetaFields) { + this.populateMetaFields = populateMetaFields; Review Comment: nit: `shouldPopulateMetaFields`, otherwise this looks as a directive not a state -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org