This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6a0d6fa4de610ded1220845365c59a84831bc454 Author: Gen Luo <luogen...@gmail.com> AuthorDate: Wed Feb 9 18:56:41 2022 +0800 [hotfix][core] Do not set parallelism without checking whether the parallelism is set when translating Sink. --- .../runtime/translators/SinkTransformationTranslator.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index d2efbb8..97c19d3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -27,7 +27,6 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo; -import org.apache.flink.streaming.api.connector.sink2.StandardSinkTopologies; import org.apache.flink.streaming.api.connector.sink2.WithPostCommitTopology; import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology; import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology; @@ -233,16 +232,6 @@ public class SinkTransformationTranslator<Input, Output> transformations.subList(numTransformsBefore, transformations.size()); for (Transformation<?> subTransformation : expandedTransformations) { - // Skip overwriting the parallelism for the global committer - if (subTransformation.getName() == null - || !subTransformation - .getName() - .equals( - StandardSinkTopologies - .GLOBAL_COMMITTER_TRANSFORMATION_NAME)) { - subTransformation.setParallelism(transformation.getParallelism()); - } - concatUid( subTransformation, Transformation::getUid,