stream2000 commented on code in PR #10515: URL: https://github.com/apache/hudi/pull/10515#discussion_r1458254203
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java: ########## @@ -72,7 +72,7 @@ public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> in HoodieWriteConfig newConfig = HoodieWriteConfig.newBuilder().withProps(props).build(); - ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(), shouldPreserveHoodieMetadata); + ConsistentBucketIndexBulkInsertPartitionerWithRows partitioner = new ConsistentBucketIndexBulkInsertPartitionerWithRows(getHoodieTable(), strategyParams, shouldPreserveHoodieMetadata); Review Comment: sure and done. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java: ########## @@ -142,10 +203,11 @@ public void addHashingChildrenNodes(String partition, List<ConsistentHashingNode @Override public boolean arePartitionRecordsSorted() { - return false; + return (sortColumnNames != null && sortColumnNames.length > 0) + || table.requireSortedRecords() || table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE; } - private int getBucketId(Row row) { + private Integer getBucketId(Row row) { Review Comment: reverted it. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java: ########## @@ -105,10 +121,55 @@ public int numPartitions() { } }; - return rows.sparkSession().createDataFrame(rowJavaRDD - .mapToPair(row -> new Tuple2<>(getBucketId(row), row)) - .partitionBy(partitioner) - .values(), rows.schema()); + if (sortColumnNames != null && sortColumnNames.length > 0) { + return rows.sparkSession().createDataFrame(rowJavaRDD + .mapToPair(row -> new Tuple2<>(row, row)) + .repartitionAndSortWithinPartitions(partitioner, new CustomRowColumnsComparator()) + .values(), + rows.schema()); + } else if (table.requireSortedRecords() || table.getConfig().getBulkInsertSortMode() != BulkInsertSortMode.NONE) { Review Comment: Yes we are actually implementing `PARTITION_SORT`, I'm just wondering for sort modes other than PARTITION_SORT, should we default to a 'no sort' behavior similar to `BulkInsertSortMode=NONE`, automatically switch to `PARTITION_SORT`, or should we throw an exception to indicate that the sort mode is not supported? Hope for your opinion, or we can keep the current behavior that switch to `PARTITION_SORT` automatically. ########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/ConsistentBucketIndexBulkInsertPartitionerWithRows.java: ########## @@ -105,10 +121,55 @@ public int numPartitions() { } }; - return rows.sparkSession().createDataFrame(rowJavaRDD - .mapToPair(row -> new Tuple2<>(getBucketId(row), row)) - .partitionBy(partitioner) - .values(), rows.schema()); + if (sortColumnNames != null && sortColumnNames.length > 0) { + return rows.sparkSession().createDataFrame(rowJavaRDD + .mapToPair(row -> new Tuple2<>(row, row)) Review Comment: We will still need the row for comparing and sort it, so keep this line ` .mapToPair(row -> new Tuple2<>(row, row))` is OK. Also comparing with partitionBy + sortWithinPartitions, repartitionAndSortWithinPartitions will be more efficient because it performs the shuffle operation only once, with both repartitioning and sorting happening in the same step. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org