This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 98c5c6c654 [HUDI-4040] Bulk insert Support CustomColumnsSortPartitioner with Row (#5502) 98c5c6c654 is described below commit 98c5c6c6543177ff4320b73b13bc153938300fe4 Author: RexAn <anh...@126.com> AuthorDate: Thu May 26 13:09:04 2022 +0800 [HUDI-4040] Bulk insert Support CustomColumnsSortPartitioner with Row (#5502) * Along the lines of RDDCustomColumnsSortPartitioner but for Row --- .../RDDCustomColumnsSortPartitioner.java | 5 +- .../RowCustomColumnsSortPartitioner.java | 60 ++++++++++++++++++++++ .../TestBulkInsertInternalPartitionerForRows.java | 55 +++++++++++++++++--- 3 files changed, 112 insertions(+), 8 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java index 2fe6fe969c..b1cbe47a6b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java @@ -29,6 +29,8 @@ import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.avro.Schema; import org.apache.spark.api.java.JavaRDD; +import java.util.Arrays; + /** * A partitioner that does sorting based on specified column values for each RDD partition. * @@ -78,6 +80,7 @@ public class RDDCustomColumnsSortPartitioner<T extends HoodieRecordPayload> } private String[] getSortColumnName(HoodieWriteConfig config) { - return config.getUserDefinedBulkInsertPartitionerSortColumns().split(","); + return Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(",")) + .map(String::trim).toArray(String[]::new); } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java new file mode 100644 index 0000000000..ceeb2b3fe8 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.execution.bulkinsert; + +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.BulkInsertPartitioner; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +import java.util.Arrays; + +/** + * A partitioner that does sorting based on specified column values for each spark partitions. + */ +public class RowCustomColumnsSortPartitioner implements BulkInsertPartitioner<Dataset<Row>> { + + private final String[] sortColumnNames; + + public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) { + this.sortColumnNames = getSortColumnName(config); + } + + public RowCustomColumnsSortPartitioner(String[] columnNames) { + this.sortColumnNames = columnNames; + } + + @Override + public Dataset<Row> repartitionRecords(Dataset<Row> records, int outputSparkPartitions) { + final String[] sortColumns = this.sortColumnNames; + return records.coalesce(outputSparkPartitions) + .sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, sortColumns); + } + + @Override + public boolean arePartitionRecordsSorted() { + return true; + } + + private String[] getSortColumnName(HoodieWriteConfig config) { + return Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(",")) + .map(String::trim).toArray(String[]::new); + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java index 276ad5b43a..52cf67228a 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java @@ -20,6 +20,8 @@ package org.apache.hudi.execution.bulkinsert; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.testutils.HoodieClientTestHarness; import org.apache.hudi.testutils.SparkDatasetTestUtils; @@ -29,6 +31,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -48,6 +51,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; */ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHarness { + private static final Comparator<Row> KEY_COMPARATOR = + Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD))); @BeforeEach public void setUp() throws Exception { initSparkContexts("TestBulkInsertInternalPartitionerForRows"); @@ -77,21 +82,47 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa Dataset<Row> records1 = generateTestRecords(); Dataset<Row> records2 = generateTestRecords(); testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode), - records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1)); + records1, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records1), Option.empty()); testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode), - records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2)); + records2, isGloballySorted, isLocallySorted, generateExpectedPartitionNumRecords(records2), Option.empty()); + } + + @Test + public void testCustomColumnSortPartitionerWithRows() { + Dataset<Row> records1 = generateTestRecords(); + Dataset<Row> records2 = generateTestRecords(); + String sortColumnString = records1.columns()[5]; + String[] sortColumns = sortColumnString.split(","); + Comparator<Row> comparator = getCustomColumnComparator(sortColumns); + + testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), + records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(sortColumns), + records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); + + HoodieWriteConfig config = HoodieWriteConfig + .newBuilder() + .withPath("/") + .withUserDefinedBulkInsertPartitionerClass(RowCustomColumnsSortPartitioner.class.getName()) + .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString) + .build(); + testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), + records1, false, true, generateExpectedPartitionNumRecords(records1), Option.of(comparator)); + testBulkInsertInternalPartitioner(new RowCustomColumnsSortPartitioner(config), + records2, false, true, generateExpectedPartitionNumRecords(records2), Option.of(comparator)); } private void testBulkInsertInternalPartitioner(BulkInsertPartitioner partitioner, Dataset<Row> rows, boolean isGloballySorted, boolean isLocallySorted, - Map<String, Long> expectedPartitionNumRecords) { + Map<String, Long> expectedPartitionNumRecords, + Option<Comparator<Row>> comparator) { int numPartitions = 2; Dataset<Row> actualRecords = (Dataset<Row>) partitioner.repartitionRecords(rows, numPartitions); List<Row> collectedActualRecords = actualRecords.collectAsList(); if (isGloballySorted) { // Verify global order - verifyRowsAscendingOrder(collectedActualRecords); + verifyRowsAscendingOrder(collectedActualRecords, comparator); } else if (isLocallySorted) { // Verify local order actualRecords.mapPartitions((MapPartitionsFunction<Row, Object>) input -> { @@ -99,7 +130,7 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa while (input.hasNext()) { partitionRows.add(input.next()); } - verifyRowsAscendingOrder(partitionRows); + verifyRowsAscendingOrder(partitionRows, comparator); return Collections.emptyList().iterator(); }, SparkDatasetTestUtils.ENCODER); } @@ -130,10 +161,20 @@ public class TestBulkInsertInternalPartitionerForRows extends HoodieClientTestHa return rowsPart1.union(rowsPart2); } - private void verifyRowsAscendingOrder(List<Row> records) { + private void verifyRowsAscendingOrder(List<Row> records, Option<Comparator<Row>> comparator) { List<Row> expectedRecords = new ArrayList<>(records); - Collections.sort(expectedRecords, Comparator.comparing(o -> (o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)))); + Collections.sort(expectedRecords,comparator.orElse(KEY_COMPARATOR)); assertEquals(expectedRecords, records); } + private Comparator<Row> getCustomColumnComparator(String[] sortColumns) { + Comparator<Row> comparator = Comparator.comparing(row -> { + StringBuilder sb = new StringBuilder(); + for (String col : sortColumns) { + sb.append(row.getAs(col).toString()); + } + return sb.toString(); + }); + return comparator; + } }