This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 00d50e91abe [HUDI-6293] Make HoodieFlinkCompactor's parallelism of compact_task more reasonable (#8854) 00d50e91abe is described below commit 00d50e91abe24aba31daa2fe2806de5414f03c77 Author: Dongsj <90449228+eric9...@users.noreply.github.com> AuthorDate: Thu Jun 1 11:38:27 2023 +0800 [HUDI-6293] Make HoodieFlinkCompactor's parallelism of compact_task more reasonable (#8854) Co-authored-by: dongsj <don...@asiainfo.com> --- .../java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 63dfd26c4ac..e396897dc7e 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -274,10 +274,12 @@ public class HoodieFlinkCompactor { List<HoodieInstant> instants = compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList()); + int totalOperations = Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum()); + // get compactionParallelism. int compactionParallelism = conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1 - ? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> pair.getRight().getOperations().size()).sum()) - : conf.getInteger(FlinkOptions.COMPACTION_TASKS); + ? totalOperations + : Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS), totalOperations); LOG.info("Start to compaction for instant " + compactionInstantTimes);