This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a439ea0f449 [HUDI-6457] Keep JavaSizeBasedClusteringPlanStrategy and 
SparkSizeBasedClusteringPlanStrategy aligned (#9099)
a439ea0f449 is described below

commit a439ea0f449fb334f0823323651ec1512f4cd5df
Author: ksmou <135721692+ks...@users.noreply.github.com>
AuthorDate: Fri Jun 30 19:39:31 2023 +0800

    [HUDI-6457] Keep JavaSizeBasedClusteringPlanStrategy and 
SparkSizeBasedClusteringPlanStrategy aligned (#9099)
---
 .../JavaSizeBasedClusteringPlanStrategy.java       | 53 +++++++++++++---------
 1 file changed, 32 insertions(+), 21 deletions(-)

diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
index fe66cedb133..d8f0c5fc804 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java
@@ -60,41 +60,52 @@ public class JavaSizeBasedClusteringPlanStrategy<T>
 
   @Override
   protected Stream<HoodieClusteringGroup> 
buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> 
fileSlices) {
+    HoodieWriteConfig writeConfig = getWriteConfig();
+
     List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>();
     List<FileSlice> currentGroup = new ArrayList<>();
+
+    // Sort fileSlices before dividing, which makes dividing more compact
+    List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices);
+    sortedFileSlices.sort((o1, o2) -> (int)
+        ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() 
: writeConfig.getParquetMaxFileSize())
+            - (o1.getBaseFile().isPresent() ? 
o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize())));
+
     long totalSizeSoFar = 0;
-    HoodieWriteConfig writeConfig = getWriteConfig();
-    for (FileSlice currentSlice : fileSlices) {
-      // assume each filegroup size is ~= parquet.max.file.size
-      totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
+
+    for (FileSlice currentSlice : sortedFileSlices) {
+      long currentSize = currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
       // check if max size is reached and create new group, if needed.
-      if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && 
!currentGroup.isEmpty()) {
+      if (totalSizeSoFar + currentSize > 
writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) {
         int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
         LOG.info("Adding one clustering group " + totalSizeSoFar + " max 
bytes: "
-                + writeConfig.getClusteringMaxBytesInGroup() + " num input 
slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
+            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
         fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
         currentGroup = new ArrayList<>();
         totalSizeSoFar = 0;
       }
+
+      // Add to the current file-group
       currentGroup.add(currentSlice);
-      // totalSizeSoFar could be 0 when new group was created in the previous 
conditional block.
-      // reset to the size of current slice, otherwise the number of output 
file group will become 0 even though current slice is present.
-      if (totalSizeSoFar == 0) {
-        totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? 
currentSlice.getBaseFile().get().getFileSize() : 
writeConfig.getParquetMaxFileSize();
-      }
+      // assume each file group size is ~= parquet.max.file.size
+      totalSizeSoFar += currentSize;
     }
+
     if (!currentGroup.isEmpty()) {
-      int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
-      LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
-              + writeConfig.getClusteringMaxBytesInGroup() + " num input 
slices: " + currentGroup.size() + " output groups: " + numOutputGroups);
-      fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+      if (currentGroup.size() > 1 || 
writeConfig.shouldClusteringSingleGroup()) {
+        int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, 
writeConfig.getClusteringTargetFileMaxBytes());
+        LOG.info("Adding final clustering group " + totalSizeSoFar + " max 
bytes: "
+            + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: 
" + currentGroup.size() + " output groups: " + numOutputGroups);
+        fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups));
+      }
     }
-    
-    return fileSliceGroups.stream().map(fileSliceGroup -> 
HoodieClusteringGroup.newBuilder()
-        .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
-        .setNumOutputFileGroups(fileSliceGroup.getRight())
-        .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
-        .build());
+
+    return fileSliceGroups.stream().map(fileSliceGroup ->
+        HoodieClusteringGroup.newBuilder()
+            .setSlices(getFileSliceInfo(fileSliceGroup.getLeft()))
+            .setNumOutputFileGroups(fileSliceGroup.getRight())
+            .setMetrics(buildMetrics(fileSliceGroup.getLeft()))
+            .build());
   }
 
   @Override

Reply via email to