stream2000 commented on code in PR #10515:
URL: https://github.com/apache/hudi/pull/10515#discussion_r1458254203


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java:
##########
@@ -72,7 +72,7 @@ public HoodieData<WriteStatus> 
performClusteringWithRecordsAsRow(Dataset<Row> in
 
     HoodieWriteConfig newConfig = 
HoodieWriteConfig.newBuilder().withProps(props).build();
 
-    ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new 
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(), 
shouldPreserveHoodieMetadata);
+    ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new 
ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(), 
strategyParams, shouldPreserveHoodieMetadata);

Review Comment:
   sure and done.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -142,10 +203,11 @@ public void addHashingChildrenNodes(String partition, 
List<ConsistentHashingNode
 
   @Override
   public boolean arePartitionRecordsSorted() {
-    return false;
+    return (sortColumnNames != null && sortColumnNames.length > 0)
+        || table.requireSortedRecords() || 
table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE;
   }
 
-  private int getBucketId(Row row) {
+  private Integer getBucketId(Row row) {

Review Comment:
   reverted it.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
       }
     };
 
-    return rows.sparkSession().createDataFrame(rowJavaRDD
-        .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
-        .partitionBy(partitioner)
-        .values(), rows.schema());
+    if (sortColumnNames != null && sortColumnNames.length > 0) {
+      return rows.sparkSession().createDataFrame(rowJavaRDD
+              .mapToPair(row -> new Tuple2<>(row, row))
+              .repartitionAndSortWithinPartitions(partitioner, new 
CustomRowColumnsComparator())
+              .values(),
+          rows.schema());
+    } else if (table.requireSortedRecords() || 
table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE) {

Review Comment:
   Yes we are actually implementing `PARTITION_SORT`, I'm just wondering for 
sort modes other than PARTITION_SORT, should we default to a 'no sort' behavior 
similar to `BulkInsertSortMode=NONE`, automatically switch to `PARTITION_SORT`, 
or should we throw an exception to indicate that the sort mode is not 
supported? 
   
   Hope for your opinion, or we can keep the current behavior that switch to 
`PARTITION_SORT` automatically.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java:
##########
@@ -105,10 +121,55 @@ public int numPartitions() {
       }
     };
 
-    return rows.sparkSession().createDataFrame(rowJavaRDD
-        .mapToPair(row -> new Tuple2<>(getBucketId(row), row))
-        .partitionBy(partitioner)
-        .values(), rows.schema());
+    if (sortColumnNames != null && sortColumnNames.length > 0) {
+      return rows.sparkSession().createDataFrame(rowJavaRDD
+              .mapToPair(row -> new Tuple2<>(row, row))

Review Comment:
   We will still need the row for comparing and sort it, so keep this line `  
.mapToPair(row -> new Tuple2<>(row, row))` is OK. 
   
   Also comparing with partitionBy + sortWithinPartitions, 
repartitionAndSortWithinPartitions will be more efficient because it performs 
the shuffle operation only once, with both repartitioning and sorting happening 
in the same step. What do you think? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to