Zouxxyy commented on code in PR #7872: URL: https://github.com/apache/hudi/pull/7872#discussion_r1105540267
########## hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java: ########## @@ -19,43 +19,70 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.table.BulkInsertPartitioner; - +import org.apache.spark.sql.Column; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; +import scala.collection.JavaConverters; import java.util.Arrays; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.ValidationUtils.checkState; +import static org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner.getOrderByColumnNames; /** - * A partitioner that does sorting based on specified column values for each spark partitions. + * A partitioner that does local sorting for each RDD partition based on the tuple of + * values of the columns configured for ordering. */ -public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> { +public class RowCustomColumnsSortPartitioner extends RepartitioningBulkInsertPartitionerBase<Dataset<Row>> { - private final String[] sortColumnNames; + private final String[] orderByColumnNames; - public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) { - this.sortColumnNames = getSortColumnName(config); + public RowCustomColumnsSortPartitioner(HoodieWriteConfig config, HoodieTableConfig tableConfig) { + super(tableConfig); + this.orderByColumnNames = getOrderByColumnNames(config); + + checkState(orderByColumnNames.length > 0); } - public RowCustomColumnsSortPartitioner(String[] columnNames) { - this.sortColumnNames = columnNames; + public RowCustomColumnsSortPartitioner(String[] columnNames, HoodieTableConfig tableConfig) { + super(tableConfig); + this.orderByColumnNames = columnNames; + + checkState(orderByColumnNames.length > 0); } @Override - public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) { - final String[] sortColumns = this.sortColumnNames; - return records.sort(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns) - .coalesce(outputSparkPartitions); + public Dataset<Row> repartitionRecords(Dataset<Row> dataset, int targetPartitionNumHint) { + Dataset<Row> repartitionedDataset; + + // NOTE: In case of partitioned table even "global" ordering (across all RDD partitions) could + // not change table's partitioning and therefore there's no point in doing global sorting + // across "physical" partitions, and instead we can reduce total amount of data being + // shuffled by doing do "local" sorting: + // - First, re-partitioning dataset such that "logical" partitions are aligned w/ + // "physical" ones + // - Sorting locally w/in RDD ("logical") partitions + // + // Non-partitioned tables will be globally sorted. + if (isPartitionedTable) { + repartitionedDataset = dataset.repartition(handleTargetPartitionNumHint(targetPartitionNumHint), + new Column(HoodieRecord.PARTITION_PATH_METADATA_FIELD)); + } else { + repartitionedDataset = tryCoalesce(dataset, targetPartitionNumHint); Review Comment: In addition, I wonder if `coalesce` can meet our needs. For example, if we want to modify the `clusteringGroup` containing N logical partitions into M (M>N) (using `cluster`), shuffle needs to happen anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org