This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 00d50e91abe [HUDI-6293] Make HoodieFlinkCompactor's  parallelism of 
compact_task more reasonable (#8854)
00d50e91abe is described below

commit 00d50e91abe24aba31daa2fe2806de5414f03c77
Author: Dongsj <90449228+eric9...@users.noreply.github.com>
AuthorDate: Thu Jun 1 11:38:27 2023 +0800

    [HUDI-6293] Make HoodieFlinkCompactor's  parallelism of compact_task more 
reasonable (#8854)
    
    Co-authored-by: dongsj <don...@asiainfo.com>
---
 .../java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java     | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 63dfd26c4ac..e396897dc7e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -274,10 +274,12 @@ public class HoodieFlinkCompactor {
 
       List<HoodieInstant> instants = 
compactionInstantTimes.stream().map(HoodieTimeline::getCompactionRequestedInstant).collect(Collectors.toList());
 
+      int totalOperations = 
Math.toIntExact(compactionPlans.stream().mapToLong(pair -> 
pair.getRight().getOperations().size()).sum());
+
       // get compactionParallelism.
       int compactionParallelism = 
conf.getInteger(FlinkOptions.COMPACTION_TASKS) == -1
-          ? Math.toIntExact(compactionPlans.stream().mapToLong(pair -> 
pair.getRight().getOperations().size()).sum())
-          : conf.getInteger(FlinkOptions.COMPACTION_TASKS);
+          ? totalOperations
+          : Math.min(conf.getInteger(FlinkOptions.COMPACTION_TASKS), 
totalOperations);
 
       LOG.info("Start to compaction for instant " + compactionInstantTimes);
 

Reply via email to