This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 6caf576d582 [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2. 6caf576d582 is described below commit 6caf576d582bf3b2fcb9c6ef71f46115c58ea59c Author: JunRuiLee <jrlee....@gmail.com> AuthorDate: Sun Jun 30 18:39:13 2024 +0800 [FLINK-35731][runtime] Fix incorrect parallelism configured detection for Sink V2. --- .../adaptivebatch/AdaptiveBatchScheduler.java | 41 +++++++++++++++++----- .../adaptivebatch/AdaptiveBatchSchedulerTest.java | 12 +++---- .../translators/SinkTransformationTranslator.java | 4 ++- .../SinkTransformationTranslatorITCaseBase.java | 32 +++++++++++++++-- 4 files changed, 69 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java index 83fb50f1514..24696d66776 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java @@ -406,15 +406,8 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { final ExecutionJobVertex jobVertex, List<BlockingResultInfo> inputs) { int vertexInitialParallelism = jobVertex.getParallelism(); ForwardGroup forwardGroup = forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId()); - if (!jobVertex.isParallelismDecided() - && forwardGroup != null - && forwardGroup.isParallelismDecided()) { - vertexInitialParallelism = forwardGroup.getParallelism(); - log.info( - "Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", - jobVertex.getName(), - jobVertex.getJobVertexId(), - vertexInitialParallelism); + if (!jobVertex.isParallelismDecided() && forwardGroup != null) { + checkState(!forwardGroup.isParallelismDecided()); } int vertexMinParallelism = ExecutionConfig.PARALLELISM_DEFAULT; @@ -453,6 +446,36 @@ public class AdaptiveBatchScheduler extends DefaultScheduler { if (forwardGroup != null && !forwardGroup.isParallelismDecided()) { forwardGroup.setParallelism(parallelismAndInputInfos.getParallelism()); + + // When the parallelism for a forward group is determined, we ensure that the + // parallelism for all job vertices within that group is also set. + // This approach ensures that each forward edge produces single subpartition. + // + // This setting is crucial because the Sink V2 committer relies on the interplay + // between the CommittableSummary and the CommittableWithLineage, which are sent by + // the upstream Sink V2 Writer. The committer expects to receive CommittableSummary + // before CommittableWithLineage. + // + // If the number of subpartitions produced by a forward edge is greater than one, + // the ordering of these elements received by the committer cannot be assured, which + // would break the assumption that CommittableSummary is received before + // CommittableWithLineage. + for (JobVertexID jobVertexId : forwardGroup.getJobVertexIds()) { + ExecutionJobVertex executionJobVertex = getExecutionJobVertex(jobVertexId); + if (!executionJobVertex.isParallelismDecided()) { + log.info( + "Parallelism of JobVertex: {} ({}) is decided to be {} according to forward group's parallelism.", + executionJobVertex.getName(), + executionJobVertex.getJobVertexId(), + parallelismAndInputInfos.getParallelism()); + changeJobVertexParallelism( + executionJobVertex, parallelismAndInputInfos.getParallelism()); + } else { + checkState( + parallelismAndInputInfos.getParallelism() + == executionJobVertex.getParallelism()); + } + } } return parallelismAndInputInfos; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java index 7930aedc315..5c18de43f54 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java @@ -186,20 +186,18 @@ class AdaptiveBatchSchedulerTest { // trigger source finished. transitionExecutionsState(scheduler, ExecutionState.FINISHED, source); assertThat(mapExecutionJobVertex.getParallelism()).isEqualTo(5); - assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(-1); + assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(5); + // check that the jobGraph is updated + assertThat(sink.getParallelism()).isEqualTo(5); // trigger map finished. transitionExecutionsState(scheduler, ExecutionState.FINISHED, map); assertThat(mapExecutionJobVertex.getParallelism()).isEqualTo(5); assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(5); - // check that the jobGraph is updated - assertThat(sink.getParallelism()).isEqualTo(5); - // check aggregatedInputDataBytes of each ExecutionVertex calculated. Total number of - // subpartitions of map is ceil(128 / 5) * 5 = 130, so total bytes sink consume is 130 * - // SUBPARTITION_BYTES = 13_000L. - checkAggregatedInputDataBytesIsCalculated(sinkExecutionJobVertex, 13_000L); + // subpartitions of map is 5, so total bytes sink consume is 5 * SUBPARTITION_BYTES = 500L. + checkAggregatedInputDataBytesIsCalculated(sinkExecutionJobVertex, 500L); } @Test diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java index e924086a1d8..aee412dc8a1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java @@ -361,7 +361,9 @@ public class SinkTransformationTranslator<Input, Output> // In this case, the subTransformation does not contain any customized // parallelism value and will therefore inherit the parallelism value // from the sinkTransformation. - subTransformation.setParallelism(transformation.getParallelism()); + subTransformation.setParallelism( + transformation.getParallelism(), + transformation.isParallelismConfigured()); } if (subTransformation.getMaxParallelism() < 0 diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java index 3c93b178b51..bcfa377595d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java @@ -121,6 +121,24 @@ public abstract class SinkTransformationTranslatorITCaseBase<SinkT> extends Test -1); } + @Test + public void testParallelismConfigured() { + testParallelismConfiguredInternal(true); + + testParallelismConfiguredInternal(false); + } + + private void testParallelismConfiguredInternal(boolean setSinkParallelism) { + final StreamGraph streamGraph = + buildGraph(sinkWithCommitter(), runtimeExecutionMode, setSinkParallelism); + + final StreamNode writerNode = findWriter(streamGraph); + final StreamNode committerNode = findCommitter(streamGraph); + + assertThat(writerNode.isParallelismConfigured(), equalTo(setSinkParallelism)); + assertThat(committerNode.isParallelismConfigured(), equalTo(setSinkParallelism)); + } + StreamNode findWriter(StreamGraph streamGraph) { return findNodeName( streamGraph, name -> name.contains("Writer") && !name.contains("Committer")); @@ -196,6 +214,11 @@ public abstract class SinkTransformationTranslatorITCaseBase<SinkT> extends Test } StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode runtimeExecutionMode) { + return buildGraph(sink, runtimeExecutionMode, true); + } + + StreamGraph buildGraph( + SinkT sink, RuntimeExecutionMode runtimeExecutionMode, boolean setSinkParallelism) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final Configuration config = new Configuration(); @@ -203,16 +226,19 @@ public abstract class SinkTransformationTranslatorITCaseBase<SinkT> extends Test env.configure(config, getClass().getClassLoader()); final DataStreamSource<Integer> src = env.fromElements(1, 2); final DataStreamSink<Integer> dataStreamSink = sinkTo(src.rebalance(), sink); - setSinkProperty(dataStreamSink); + setSinkProperty(dataStreamSink, setSinkParallelism); // Trigger the plan generation but do not clear the transformations env.getExecutionPlan(); return env.getStreamGraph(); } - private void setSinkProperty(DataStreamSink<Integer> dataStreamSink) { + private void setSinkProperty( + DataStreamSink<Integer> dataStreamSink, boolean setSinkParallelism) { dataStreamSink.name(NAME); dataStreamSink.uid(UID); - dataStreamSink.setParallelism(SinkTransformationTranslatorITCaseBase.PARALLELISM); + if (setSinkParallelism) { + dataStreamSink.setParallelism(SinkTransformationTranslatorITCaseBase.PARALLELISM); + } dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP); }