This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 98c5c6c654 [HUDI-4040] Bulk insert Support 
CustomColumnsSortPartitioner with Row (#5502)
98c5c6c654 is described below

commit 98c5c6c6543177ff4320b73b13bc153938300fe4
Author: RexAn <anh...@126.com>
AuthorDate: Thu May 26 13:09:04 2022 +0800

    [HUDI-4040] Bulk insert Support CustomColumnsSortPartitioner with Row 
(#5502)
    
    * Along the lines of RDDCustomColumnsSortPartitioner but for Row
---
 .../RDDCustomColumnsSortPartitioner.java           |  5 +-
 .../RowCustomColumnsSortPartitioner.java           | 60 ++++++++++++++++++++++
 .../TestBulkInsertInternalPartitionerForRows.java  | 55 +++++++++++++++++---
 3 files changed, 112 insertions(+), 8 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
index 2fe6fe969c..b1cbe47a6b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDCustomColumnsSortPartitioner.java
@@ -29,6 +29,8 @@ import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaRDD;
 
+import java.util.Arrays;
+
 /**
  * A partitioner that does sorting based on specified column values for each 
RDD partition.
  *
@@ -78,6 +80,7 @@ public class RDDCustomColumnsSortPartitioner<T extends 
HoodieRecordPayload>
   }
 
   private String[] getSortColumnName(HoodieWriteConfig config) {
-    return config.getUserDefinedBulkInsertPartitionerSortColumns().split(",");
+    return 
Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(","))
+        .map(String::trim).toArray(String[]::new);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
new file mode 100644
index 0000000000..ceeb2b3fe8
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RowCustomColumnsSortPartitioner.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.execution.bulkinsert;
+
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
+import java.util.Arrays;
+
+/**
+ * A partitioner that does sorting based on specified column values for each 
spark partitions.
+ */
+public class RowCustomColumnsSortPartitioner implements 
BulkInsertPartitioner<Dataset<Row>> {
+
+  private final String[] sortColumnNames;
+
+  public RowCustomColumnsSortPartitioner(HoodieWriteConfig config) {
+    this.sortColumnNames = getSortColumnName(config);
+  }
+
+  public RowCustomColumnsSortPartitioner(String[] columnNames) {
+    this.sortColumnNames = columnNames;
+  }
+
+  @Override
+  public Dataset<Row> repartitionRecords(Dataset<Row> records, int 
outputSparkPartitions) {
+    final String[] sortColumns = this.sortColumnNames;
+    return records.coalesce(outputSparkPartitions)
+        .sortWithinPartitions(HoodieRecord.PARTITION_PATH_METADATA_FIELD, 
sortColumns);
+  }
+
+  @Override
+  public boolean arePartitionRecordsSorted() {
+    return true;
+  }
+
+  private String[] getSortColumnName(HoodieWriteConfig config) {
+    return 
Arrays.stream(config.getUserDefinedBulkInsertPartitionerSortColumns().split(","))
+        .map(String::trim).toArray(String[]::new);
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
index 276ad5b43a..52cf67228a 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/execution/bulkinsert/TestBulkInsertInternalPartitionerForRows.java
@@ -20,6 +20,8 @@ package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.testutils.HoodieClientTestHarness;
 import org.apache.hudi.testutils.SparkDatasetTestUtils;
@@ -29,6 +31,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -48,6 +51,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
  */
 public class TestBulkInsertInternalPartitionerForRows extends 
HoodieClientTestHarness {
 
+  private static final Comparator<Row> KEY_COMPARATOR =
+      Comparator.comparing(o -> 
(o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + 
o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD)));
   @BeforeEach
   public void setUp() throws Exception {
     initSparkContexts("TestBulkInsertInternalPartitionerForRows");
@@ -77,21 +82,47 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
     Dataset<Row> records1 = generateTestRecords();
     Dataset<Row> records2 = generateTestRecords();
     
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
-        records1, isGloballySorted, isLocallySorted, 
generateExpectedPartitionNumRecords(records1));
+        records1, isGloballySorted, isLocallySorted, 
generateExpectedPartitionNumRecords(records1), Option.empty());
     
testBulkInsertInternalPartitioner(BulkInsertInternalPartitionerWithRowsFactory.get(sortMode),
-        records2, isGloballySorted, isLocallySorted, 
generateExpectedPartitionNumRecords(records2));
+        records2, isGloballySorted, isLocallySorted, 
generateExpectedPartitionNumRecords(records2), Option.empty());
+  }
+
+  @Test
+  public void testCustomColumnSortPartitionerWithRows() {
+    Dataset<Row> records1 = generateTestRecords();
+    Dataset<Row> records2 = generateTestRecords();
+    String sortColumnString = records1.columns()[5];
+    String[] sortColumns = sortColumnString.split(",");
+    Comparator<Row> comparator = getCustomColumnComparator(sortColumns);
+
+    testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(sortColumns),
+        records1, false, true, generateExpectedPartitionNumRecords(records1), 
Option.of(comparator));
+    testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(sortColumns),
+        records2, false, true, generateExpectedPartitionNumRecords(records2), 
Option.of(comparator));
+
+    HoodieWriteConfig config = HoodieWriteConfig
+        .newBuilder()
+        .withPath("/")
+        
.withUserDefinedBulkInsertPartitionerClass(RowCustomColumnsSortPartitioner.class.getName())
+        .withUserDefinedBulkInsertPartitionerSortColumns(sortColumnString)
+        .build();
+    testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(config),
+        records1, false, true, generateExpectedPartitionNumRecords(records1), 
Option.of(comparator));
+    testBulkInsertInternalPartitioner(new 
RowCustomColumnsSortPartitioner(config),
+        records2, false, true, generateExpectedPartitionNumRecords(records2), 
Option.of(comparator));
   }
 
   private void testBulkInsertInternalPartitioner(BulkInsertPartitioner 
partitioner,
       Dataset<Row> rows,
       boolean isGloballySorted, boolean isLocallySorted,
-      Map<String, Long> expectedPartitionNumRecords) {
+      Map<String, Long> expectedPartitionNumRecords,
+      Option<Comparator<Row>> comparator) {
     int numPartitions = 2;
     Dataset<Row> actualRecords = (Dataset<Row>) 
partitioner.repartitionRecords(rows, numPartitions);
     List<Row> collectedActualRecords = actualRecords.collectAsList();
     if (isGloballySorted) {
       // Verify global order
-      verifyRowsAscendingOrder(collectedActualRecords);
+      verifyRowsAscendingOrder(collectedActualRecords, comparator);
     } else if (isLocallySorted) {
       // Verify local order
       actualRecords.mapPartitions((MapPartitionsFunction<Row, Object>) input 
-> {
@@ -99,7 +130,7 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
         while (input.hasNext()) {
           partitionRows.add(input.next());
         }
-        verifyRowsAscendingOrder(partitionRows);
+        verifyRowsAscendingOrder(partitionRows, comparator);
         return Collections.emptyList().iterator();
       }, SparkDatasetTestUtils.ENCODER);
     }
@@ -130,10 +161,20 @@ public class TestBulkInsertInternalPartitionerForRows 
extends HoodieClientTestHa
     return rowsPart1.union(rowsPart2);
   }
 
-  private void verifyRowsAscendingOrder(List<Row> records) {
+  private void verifyRowsAscendingOrder(List<Row> records, 
Option<Comparator<Row>> comparator) {
     List<Row> expectedRecords = new ArrayList<>(records);
-    Collections.sort(expectedRecords, Comparator.comparing(o -> 
(o.getAs(HoodieRecord.PARTITION_PATH_METADATA_FIELD) + "+" + 
o.getAs(HoodieRecord.RECORD_KEY_METADATA_FIELD))));
+    Collections.sort(expectedRecords,comparator.orElse(KEY_COMPARATOR));
     assertEquals(expectedRecords, records);
   }
 
+  private Comparator<Row> getCustomColumnComparator(String[] sortColumns) {
+    Comparator<Row> comparator = Comparator.comparing(row -> {
+      StringBuilder sb = new StringBuilder();
+      for (String col : sortColumns) {
+        sb.append(row.getAs(col).toString());
+      }
+      return sb.toString();
+    });
+    return comparator;
+  }
 }

Reply via email to