This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new f06ff5b3e0e [HUDI-7090] Set the maxParallelism for singleton operator (#10090) f06ff5b3e0e is described below commit f06ff5b3e0ee8bb6e49aad04d3b6054d6c46e272 Author: hehuiyuan <471627...@qq.com> AuthorDate: Fri Nov 17 09:43:21 2023 +0800 [HUDI-7090] Set the maxParallelism for singleton operator (#10090) --- .../hudi/sink/clustering/HoodieFlinkClusteringJob.java | 4 +++- .../org/apache/hudi/sink/compact/HoodieFlinkCompactor.java | 4 +++- .../main/java/org/apache/hudi/sink/utils/Pipelines.java | 14 +++++++++++--- .../main/java/org/apache/hudi/table/HoodieTableSource.java | 1 + 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index a453cac6803..0966f6995bd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -348,7 +348,9 @@ public class HoodieFlinkClusteringJob { .addSink(new ClusteringCommitSink(conf)) .name("clustering_commit") .uid("uid_clustering_commit") - .setParallelism(1); + .setParallelism(1) + .getTransformation() + .setMaxParallelism(1); env.execute("flink_hudi_clustering_" + clusteringInstant.getTimestamp()); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index 57e823ab21c..99dd45d94b4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -298,7 +298,9 @@ public class HoodieFlinkCompactor { .addSink(new CompactionCommitSink(conf)) .name("compaction_commit") .uid("uid_compaction_commit") - .setParallelism(1); + .setParallelism(1) + .getTransformation() + .setMaxParallelism(1); env.execute("flink_hudi_compaction_" + String.join(",", compactionInstantTimes)); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index e66009aa551..b3acd4cfa11 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -410,10 +410,11 @@ public class Pipelines { * @return the compaction pipeline */ public static DataStreamSink<CompactionCommitEvent> compact(Configuration conf, DataStream<Object> dataStream) { - return dataStream.transform("compact_plan_generate", + DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream = dataStream.transform("compact_plan_generate", TypeInformation.of(CompactionPlanEvent.class), new CompactionPlanOperator(conf)) .setParallelism(1) // plan generate must be singleton + .setMaxParallelism(1) // make the distribution strategy deterministic to avoid concurrent modifications // on the same bucket files .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId()) @@ -424,6 +425,8 @@ public class Pipelines { .addSink(new CompactionCommitSink(conf)) .name("compact_commit") .setParallelism(1); // compaction commit should be singleton + compactionCommitEventDataStream.getTransformation().setMaxParallelism(1); + return compactionCommitEventDataStream; } /** @@ -452,6 +455,7 @@ public class Pipelines { TypeInformation.of(ClusteringPlanEvent.class), new ClusteringPlanOperator(conf)) .setParallelism(1) // plan generate must be singleton + .setMaxParallelism(1) // plan generate must be singleton .keyBy(plan -> // make the distribution strategy deterministic to avoid concurrent modifications // on the same bucket files @@ -465,15 +469,19 @@ public class Pipelines { ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(), conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L); } - return clusteringStream.addSink(new ClusteringCommitSink(conf)) + DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream = clusteringStream.addSink(new ClusteringCommitSink(conf)) .name("clustering_commit") .setParallelism(1); // clustering commit should be singleton + clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1); + return clusteringCommitEventDataStream; } public static DataStreamSink<Object> clean(Configuration conf, DataStream<Object> dataStream) { - return dataStream.addSink(new CleanFunction<>(conf)) + DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new CleanFunction<>(conf)) .setParallelism(1) .name("clean_commits"); + cleanCommitDataStream.getTransformation().setMaxParallelism(1); + return cleanCommitDataStream; } public static DataStreamSink<Object> dummySink(DataStream<Object> dataStream) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java index e4b8db33516..b4ef68a3939 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java @@ -207,6 +207,7 @@ public class HoodieTableSource implements SingleOutputStreamOperator<RowData> source = execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor")) .uid(Pipelines.opUID("split_monitor", conf)) .setParallelism(1) + .setMaxParallelism(1) .keyBy(MergeOnReadInputSplit::getFileId) .transform("split_reader", typeInfo, factory) .uid(Pipelines.opUID("split_reader", conf))