This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b36e7c45990 [HUDI-6293] Make HoodieClusteringJob's parallelism of 
clustering_task more reasonable (#8866)
b36e7c45990 is described below

commit b36e7c459904860b0be086c144ba0b175961e805
Author: voonhous <voonho...@gmail.com>
AuthorDate: Fri Jun 2 10:52:04 2023 +0800

    [HUDI-6293] Make HoodieClusteringJob's parallelism of clustering_task more 
reasonable (#8866)
---
 .../org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index 633f06b0e4f..223f85defca 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -310,9 +310,12 @@ public class HoodieFlinkClusteringJob {
 
       HoodieInstant instant = 
HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp());
 
+      int inputGroupSize = clusteringPlan.getInputGroups().size();
+
       // get clusteringParallelism.
       int clusteringParallelism = 
conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1
-          ? clusteringPlan.getInputGroups().size() : 
conf.getInteger(FlinkOptions.CLUSTERING_TASKS);
+          ? inputGroupSize
+          : Math.min(conf.getInteger(FlinkOptions.CLUSTERING_TASKS), 
inputGroupSize);
 
       // Mark instant as clustering inflight
       table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, 
Option.empty());

Reply via email to