This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new a439ea0f449 [HUDI-6457] Keep JavaSizeBasedClusteringPlanStrategy and SparkSizeBasedClusteringPlanStrategy aligned (#9099) a439ea0f449 is described below commit a439ea0f449fb334f0823323651ec1512f4cd5df Author: ksmou <135721692+ks...@users.noreply.github.com> AuthorDate: Fri Jun 30 19:39:31 2023 +0800 [HUDI-6457] Keep JavaSizeBasedClusteringPlanStrategy and SparkSizeBasedClusteringPlanStrategy aligned (#9099) --- .../JavaSizeBasedClusteringPlanStrategy.java | 53 +++++++++++++--------- 1 file changed, 32 insertions(+), 21 deletions(-) diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java index fe66cedb133..d8f0c5fc804 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/plan/strategy/JavaSizeBasedClusteringPlanStrategy.java @@ -60,41 +60,52 @@ public class JavaSizeBasedClusteringPlanStrategy<T> @Override protected Stream<HoodieClusteringGroup> buildClusteringGroupsForPartition(String partitionPath, List<FileSlice> fileSlices) { + HoodieWriteConfig writeConfig = getWriteConfig(); + List<Pair<List<FileSlice>, Integer>> fileSliceGroups = new ArrayList<>(); List<FileSlice> currentGroup = new ArrayList<>(); + + // Sort fileSlices before dividing, which makes dividing more compact + List<FileSlice> sortedFileSlices = new ArrayList<>(fileSlices); + sortedFileSlices.sort((o1, o2) -> (int) + ((o2.getBaseFile().isPresent() ? o2.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize()) + - (o1.getBaseFile().isPresent() ? o1.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize()))); + long totalSizeSoFar = 0; - HoodieWriteConfig writeConfig = getWriteConfig(); - for (FileSlice currentSlice : fileSlices) { - // assume each filegroup size is ~= parquet.max.file.size - totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); + + for (FileSlice currentSlice : sortedFileSlices) { + long currentSize = currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); // check if max size is reached and create new group, if needed. - if (totalSizeSoFar >= writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { + if (totalSizeSoFar + currentSize > writeConfig.getClusteringMaxBytesInGroup() && !currentGroup.isEmpty()) { int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes()); LOG.info("Adding one clustering group " + totalSizeSoFar + " max bytes: " - + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); currentGroup = new ArrayList<>(); totalSizeSoFar = 0; } + + // Add to the current file-group currentGroup.add(currentSlice); - // totalSizeSoFar could be 0 when new group was created in the previous conditional block. - // reset to the size of current slice, otherwise the number of output file group will become 0 even though current slice is present. - if (totalSizeSoFar == 0) { - totalSizeSoFar += currentSlice.getBaseFile().isPresent() ? currentSlice.getBaseFile().get().getFileSize() : writeConfig.getParquetMaxFileSize(); - } + // assume each file group size is ~= parquet.max.file.size + totalSizeSoFar += currentSize; } + if (!currentGroup.isEmpty()) { - int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes()); - LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: " - + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); - fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); + if (currentGroup.size() > 1 || writeConfig.shouldClusteringSingleGroup()) { + int numOutputGroups = getNumberOfOutputFileGroups(totalSizeSoFar, writeConfig.getClusteringTargetFileMaxBytes()); + LOG.info("Adding final clustering group " + totalSizeSoFar + " max bytes: " + + writeConfig.getClusteringMaxBytesInGroup() + " num input slices: " + currentGroup.size() + " output groups: " + numOutputGroups); + fileSliceGroups.add(Pair.of(currentGroup, numOutputGroups)); + } } - - return fileSliceGroups.stream().map(fileSliceGroup -> HoodieClusteringGroup.newBuilder() - .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) - .setNumOutputFileGroups(fileSliceGroup.getRight()) - .setMetrics(buildMetrics(fileSliceGroup.getLeft())) - .build()); + + return fileSliceGroups.stream().map(fileSliceGroup -> + HoodieClusteringGroup.newBuilder() + .setSlices(getFileSliceInfo(fileSliceGroup.getLeft())) + .setNumOutputFileGroups(fileSliceGroup.getRight()) + .setMetrics(buildMetrics(fileSliceGroup.getLeft())) + .build()); } @Override