This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f06ff5b3e0e [HUDI-7090] Set the maxParallelism for singleton operator  
(#10090)
f06ff5b3e0e is described below

commit f06ff5b3e0ee8bb6e49aad04d3b6054d6c46e272
Author: hehuiyuan <471627...@qq.com>
AuthorDate: Fri Nov 17 09:43:21 2023 +0800

    [HUDI-7090] Set the maxParallelism for singleton operator  (#10090)
---
 .../hudi/sink/clustering/HoodieFlinkClusteringJob.java     |  4 +++-
 .../org/apache/hudi/sink/compact/HoodieFlinkCompactor.java |  4 +++-
 .../main/java/org/apache/hudi/sink/utils/Pipelines.java    | 14 +++++++++++---
 .../main/java/org/apache/hudi/table/HoodieTableSource.java |  1 +
 4 files changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
index a453cac6803..0966f6995bd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java
@@ -348,7 +348,9 @@ public class HoodieFlinkClusteringJob {
           .addSink(new ClusteringCommitSink(conf))
           .name("clustering_commit")
           .uid("uid_clustering_commit")
-          .setParallelism(1);
+          .setParallelism(1)
+          .getTransformation()
+          .setMaxParallelism(1);
 
       env.execute("flink_hudi_clustering_" + clusteringInstant.getTimestamp());
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
index 57e823ab21c..99dd45d94b4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java
@@ -298,7 +298,9 @@ public class HoodieFlinkCompactor {
           .addSink(new CompactionCommitSink(conf))
           .name("compaction_commit")
           .uid("uid_compaction_commit")
-          .setParallelism(1);
+          .setParallelism(1)
+          .getTransformation()
+          .setMaxParallelism(1);
 
       env.execute("flink_hudi_compaction_" + String.join(",", 
compactionInstantTimes));
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index e66009aa551..b3acd4cfa11 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -410,10 +410,11 @@ public class Pipelines {
    * @return the compaction pipeline
    */
   public static DataStreamSink<CompactionCommitEvent> compact(Configuration 
conf, DataStream<Object> dataStream) {
-    return dataStream.transform("compact_plan_generate",
+    DataStreamSink<CompactionCommitEvent> compactionCommitEventDataStream = 
dataStream.transform("compact_plan_generate",
             TypeInformation.of(CompactionPlanEvent.class),
             new CompactionPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
+        .setMaxParallelism(1)
         // make the distribution strategy deterministic to avoid concurrent 
modifications
         // on the same bucket files
         .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId())
@@ -424,6 +425,8 @@ public class Pipelines {
         .addSink(new CompactionCommitSink(conf))
         .name("compact_commit")
         .setParallelism(1); // compaction commit should be singleton
+    compactionCommitEventDataStream.getTransformation().setMaxParallelism(1);
+    return compactionCommitEventDataStream;
   }
 
   /**
@@ -452,6 +455,7 @@ public class Pipelines {
             TypeInformation.of(ClusteringPlanEvent.class),
             new ClusteringPlanOperator(conf))
         .setParallelism(1) // plan generate must be singleton
+        .setMaxParallelism(1) // plan generate must be singleton
         .keyBy(plan ->
             // make the distribution strategy deterministic to avoid 
concurrent modifications
             // on the same bucket files
@@ -465,15 +469,19 @@ public class Pipelines {
       ExecNodeUtil.setManagedMemoryWeight(clusteringStream.getTransformation(),
           conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
     }
-    return clusteringStream.addSink(new ClusteringCommitSink(conf))
+    DataStreamSink<ClusteringCommitEvent> clusteringCommitEventDataStream = 
clusteringStream.addSink(new ClusteringCommitSink(conf))
         .name("clustering_commit")
         .setParallelism(1); // clustering commit should be singleton
+    clusteringCommitEventDataStream.getTransformation().setMaxParallelism(1);
+    return clusteringCommitEventDataStream;
   }
 
   public static DataStreamSink<Object> clean(Configuration conf, 
DataStream<Object> dataStream) {
-    return dataStream.addSink(new CleanFunction<>(conf))
+    DataStreamSink<Object> cleanCommitDataStream = dataStream.addSink(new 
CleanFunction<>(conf))
         .setParallelism(1)
         .name("clean_commits");
+    cleanCommitDataStream.getTransformation().setMaxParallelism(1);
+    return cleanCommitDataStream;
   }
 
   public static DataStreamSink<Object> dummySink(DataStream<Object> 
dataStream) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index e4b8db33516..b4ef68a3939 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -207,6 +207,7 @@ public class HoodieTableSource implements
           SingleOutputStreamOperator<RowData> source = 
execEnv.addSource(monitoringFunction, getSourceOperatorName("split_monitor"))
               .uid(Pipelines.opUID("split_monitor", conf))
               .setParallelism(1)
+              .setMaxParallelism(1)
               .keyBy(MergeOnReadInputSplit::getFileId)
               .transform("split_reader", typeInfo, factory)
               .uid(Pipelines.opUID("split_reader", conf))

Reply via email to