This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new b36e7c45990 [HUDI-6293] Make HoodieClusteringJob's parallelism of clustering_task more reasonable (#8866) b36e7c45990 is described below commit b36e7c459904860b0be086c144ba0b175961e805 Author: voonhous <voonho...@gmail.com> AuthorDate: Fri Jun 2 10:52:04 2023 +0800 [HUDI-6293] Make HoodieClusteringJob's parallelism of clustering_task more reasonable (#8866) --- .../org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index 633f06b0e4f..223f85defca 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -310,9 +310,12 @@ public class HoodieFlinkClusteringJob { HoodieInstant instant = HoodieTimeline.getReplaceCommitRequestedInstant(clusteringInstant.getTimestamp()); + int inputGroupSize = clusteringPlan.getInputGroups().size(); + // get clusteringParallelism. int clusteringParallelism = conf.getInteger(FlinkOptions.CLUSTERING_TASKS) == -1 - ? clusteringPlan.getInputGroups().size() : conf.getInteger(FlinkOptions.CLUSTERING_TASKS); + ? inputGroupSize + : Math.min(conf.getInteger(FlinkOptions.CLUSTERING_TASKS), inputGroupSize); // Mark instant as clustering inflight table.getActiveTimeline().transitionReplaceRequestedToInflight(instant, Option.empty());