alexeykudinkin commented on code in PR #7411:
URL: https://github.com/apache/hudi/pull/7411#discussion_r1046110926


##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitioner.java:
##########
@@ -126,35 +134,40 @@ private void 
testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner
                                                  boolean isGloballySorted,
                                                  boolean isLocallySorted,
                                                  Map<String, Long> 
expectedPartitionNumRecords,
-                                                 
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator) {
+                                                 
Option<Comparator<HoodieRecord<? extends HoodieRecordPayload>>> comparator,
+                                                 boolean populateMetaFields) {
     int numPartitions = 2;
-    JavaRDD<HoodieRecord<? extends HoodieRecordPayload>> actualRecords =
-        (JavaRDD<HoodieRecord<? extends HoodieRecordPayload>>) 
partitioner.repartitionRecords(records, numPartitions);
-    assertEquals(
-        enforceNumOutputPartitions ? numPartitions : 
records.getNumPartitions(),
-        actualRecords.getNumPartitions());
-    List<HoodieRecord<? extends HoodieRecordPayload>> collectedActualRecords = 
actualRecords.collect();
-    if (isGloballySorted) {
-      // Verify global order
-      verifyRecordAscendingOrder(collectedActualRecords, comparator);
-    } else if (isLocallySorted) {
-      // Verify local order
-      actualRecords.mapPartitions(partition -> {
-        List<HoodieRecord<? extends HoodieRecordPayload>> partitionRecords = 
new ArrayList<>();
-        partition.forEachRemaining(partitionRecords::add);
-        verifyRecordAscendingOrder(partitionRecords, comparator);
-        return Collections.emptyList().iterator();
-      }).collect();
-    }
+    if (!populateMetaFields) {
+      assertThrows(IllegalStateException.class, () -> 
partitioner.repartitionRecords(records, numPartitions));

Review Comment:
   We can just return in here, there there would be no need to shift the block 
below



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java:
##########
@@ -25,13 +25,22 @@
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.functions;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 /**
  * A built-in partitioner that does global sorting for the input Rows across 
partitions after repartition for bulk insert operation, corresponding to the 
{@code BulkInsertSortMode.GLOBAL_SORT} mode.
  */
 public class GlobalSortPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
 
+  private final boolean populateMetaFields;
+
+  public GlobalSortPartitionerWithRows(boolean populateMetaFields) {
+    this.populateMetaFields = populateMetaFields;
+  }
+
   @Override
   public Dataset<Row> repartitionRecords(Dataset<Row> rows, int 
outputSparkPartitions) {
+    checkState(populateMetaFields, "Meta fields are disabled!");

Review Comment:
   This exception is aimed at the user and should convey the full message for 
user to understand the misconfiguration. Let's refine it to be a little more 
hinting, like for ex: "Global Sort mode requires meta-fields to be enabled" 
(and also throw HoodieException)



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java:
##########
@@ -146,37 +155,42 @@ private void 
testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner
                                                  boolean isGloballySorted,
                                                  boolean isLocallySorted,
                                                  Map<String, Long> 
expectedPartitionNumRecords,
-                                                 Option<Comparator<Row>> 
comparator) {
+                                                 Option<Comparator<Row>> 
comparator,
+                                                 boolean populateMetaFields) {
     int numPartitions = 2;
-    Dataset<Row> actualRecords = (Dataset<Row>) 
partitioner.repartitionRecords(rows, numPartitions);
-    assertEquals(
-        enforceNumOutputPartitions ? numPartitions : 
rows.rdd().getNumPartitions(),
-        actualRecords.rdd().getNumPartitions());
-
-    List<Row> collectedActualRecords = actualRecords.collectAsList();
-    if (isGloballySorted) {
-      // Verify global order
-      verifyRowsAscendingOrder(collectedActualRecords, comparator);
-    } else if (isLocallySorted) {
-      // Verify local order
-      actualRecords.mapPartitions((MapPartitionsFunction<Row, Object>) input 
-> {
-        List<Row> partitionRows = new ArrayList<>();
-        while (input.hasNext()) {
-          partitionRows.add(input.next());
-        }
-        verifyRowsAscendingOrder(partitionRows, comparator);
-        return Collections.emptyList().iterator();
-      }, SparkDatasetTestUtils.ENCODER);
-    }
+    if (!populateMetaFields) {
+      assertThrows(IllegalStateException.class, () -> 
partitioner.repartitionRecords(rows, numPartitions));

Review Comment:
   Same as above



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/GlobalSortPartitionerWithRows.java:
##########
@@ -25,13 +25,22 @@
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.functions;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 /**
  * A built-in partitioner that does global sorting for the input Rows across 
partitions after repartition for bulk insert operation, corresponding to the 
{@code BulkInsertSortMode.GLOBAL_SORT} mode.
  */
 public class GlobalSortPartitionerWithRows implements 
BulkInsertPartitioner<Dataset<Row>> {
 
+  private final boolean populateMetaFields;
+
+  public GlobalSortPartitionerWithRows(boolean populateMetaFields) {
+    this.populateMetaFields = populateMetaFields;

Review Comment:
   nit: `shouldPopulateMetaFields`, otherwise this looks as a directive not a 
state 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to