This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 6caf576d582 [FLINK-35731][runtime] Fix incorrect parallelism 
configured detection for Sink V2.
6caf576d582 is described below

commit 6caf576d582bf3b2fcb9c6ef71f46115c58ea59c
Author: JunRuiLee <jrlee....@gmail.com>
AuthorDate: Sun Jun 30 18:39:13 2024 +0800

    [FLINK-35731][runtime] Fix incorrect parallelism configured detection for 
Sink V2.
---
 .../adaptivebatch/AdaptiveBatchScheduler.java      | 41 +++++++++++++++++-----
 .../adaptivebatch/AdaptiveBatchSchedulerTest.java  | 12 +++----
 .../translators/SinkTransformationTranslator.java  |  4 ++-
 .../SinkTransformationTranslatorITCaseBase.java    | 32 +++++++++++++++--
 4 files changed, 69 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 83fb50f1514..24696d66776 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -406,15 +406,8 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
             final ExecutionJobVertex jobVertex, List<BlockingResultInfo> 
inputs) {
         int vertexInitialParallelism = jobVertex.getParallelism();
         ForwardGroup forwardGroup = 
forwardGroupsByJobVertexId.get(jobVertex.getJobVertexId());
-        if (!jobVertex.isParallelismDecided()
-                && forwardGroup != null
-                && forwardGroup.isParallelismDecided()) {
-            vertexInitialParallelism = forwardGroup.getParallelism();
-            log.info(
-                    "Parallelism of JobVertex: {} ({}) is decided to be {} 
according to forward group's parallelism.",
-                    jobVertex.getName(),
-                    jobVertex.getJobVertexId(),
-                    vertexInitialParallelism);
+        if (!jobVertex.isParallelismDecided() && forwardGroup != null) {
+            checkState(!forwardGroup.isParallelismDecided());
         }
 
         int vertexMinParallelism = ExecutionConfig.PARALLELISM_DEFAULT;
@@ -453,6 +446,36 @@ public class AdaptiveBatchScheduler extends 
DefaultScheduler {
 
         if (forwardGroup != null && !forwardGroup.isParallelismDecided()) {
             
forwardGroup.setParallelism(parallelismAndInputInfos.getParallelism());
+
+            // When the parallelism for a forward group is determined, we 
ensure that the
+            // parallelism for all job vertices within that group is also set.
+            // This approach ensures that each forward edge produces single 
subpartition.
+            //
+            // This setting is crucial because the Sink V2 committer relies on 
the interplay
+            // between the CommittableSummary and the CommittableWithLineage, 
which are sent by
+            // the upstream Sink V2 Writer. The committer expects to receive 
CommittableSummary
+            // before CommittableWithLineage.
+            //
+            // If the number of subpartitions produced by a forward edge is 
greater than one,
+            // the ordering of these elements received by the committer cannot 
be assured, which
+            // would break the assumption that CommittableSummary is received 
before
+            // CommittableWithLineage.
+            for (JobVertexID jobVertexId : forwardGroup.getJobVertexIds()) {
+                ExecutionJobVertex executionJobVertex = 
getExecutionJobVertex(jobVertexId);
+                if (!executionJobVertex.isParallelismDecided()) {
+                    log.info(
+                            "Parallelism of JobVertex: {} ({}) is decided to 
be {} according to forward group's parallelism.",
+                            executionJobVertex.getName(),
+                            executionJobVertex.getJobVertexId(),
+                            parallelismAndInputInfos.getParallelism());
+                    changeJobVertexParallelism(
+                            executionJobVertex, 
parallelismAndInputInfos.getParallelism());
+                } else {
+                    checkState(
+                            parallelismAndInputInfos.getParallelism()
+                                    == executionJobVertex.getParallelism());
+                }
+            }
         }
 
         return parallelismAndInputInfos;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
index 7930aedc315..5c18de43f54 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerTest.java
@@ -186,20 +186,18 @@ class AdaptiveBatchSchedulerTest {
         // trigger source finished.
         transitionExecutionsState(scheduler, ExecutionState.FINISHED, source);
         assertThat(mapExecutionJobVertex.getParallelism()).isEqualTo(5);
-        assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(-1);
+        assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(5);
+        // check that the jobGraph is updated
+        assertThat(sink.getParallelism()).isEqualTo(5);
 
         // trigger map finished.
         transitionExecutionsState(scheduler, ExecutionState.FINISHED, map);
         assertThat(mapExecutionJobVertex.getParallelism()).isEqualTo(5);
         assertThat(sinkExecutionJobVertex.getParallelism()).isEqualTo(5);
 
-        // check that the jobGraph is updated
-        assertThat(sink.getParallelism()).isEqualTo(5);
-
         // check aggregatedInputDataBytes of each ExecutionVertex calculated. 
Total number of
-        // subpartitions of map is ceil(128 / 5) * 5 = 130, so total bytes 
sink consume is 130 *
-        // SUBPARTITION_BYTES = 13_000L.
-        checkAggregatedInputDataBytesIsCalculated(sinkExecutionJobVertex, 
13_000L);
+        // subpartitions of map is 5, so total bytes sink consume is 5 * 
SUBPARTITION_BYTES = 500L.
+        checkAggregatedInputDataBytesIsCalculated(sinkExecutionJobVertex, 
500L);
     }
 
     @Test
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index e924086a1d8..aee412dc8a1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -361,7 +361,9 @@ public class SinkTransformationTranslator<Input, Output>
                     // In this case, the subTransformation does not contain 
any customized
                     // parallelism value and will therefore inherit the 
parallelism value
                     // from the sinkTransformation.
-                    
subTransformation.setParallelism(transformation.getParallelism());
+                    subTransformation.setParallelism(
+                            transformation.getParallelism(),
+                            transformation.isParallelismConfigured());
                 }
 
                 if (subTransformation.getMaxParallelism() < 0
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
index 3c93b178b51..bcfa377595d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
@@ -121,6 +121,24 @@ public abstract class 
SinkTransformationTranslatorITCaseBase<SinkT> extends Test
                 -1);
     }
 
+    @Test
+    public void testParallelismConfigured() {
+        testParallelismConfiguredInternal(true);
+
+        testParallelismConfiguredInternal(false);
+    }
+
+    private void testParallelismConfiguredInternal(boolean setSinkParallelism) 
{
+        final StreamGraph streamGraph =
+                buildGraph(sinkWithCommitter(), runtimeExecutionMode, 
setSinkParallelism);
+
+        final StreamNode writerNode = findWriter(streamGraph);
+        final StreamNode committerNode = findCommitter(streamGraph);
+
+        assertThat(writerNode.isParallelismConfigured(), 
equalTo(setSinkParallelism));
+        assertThat(committerNode.isParallelismConfigured(), 
equalTo(setSinkParallelism));
+    }
+
     StreamNode findWriter(StreamGraph streamGraph) {
         return findNodeName(
                 streamGraph, name -> name.contains("Writer") && 
!name.contains("Committer"));
@@ -196,6 +214,11 @@ public abstract class 
SinkTransformationTranslatorITCaseBase<SinkT> extends Test
     }
 
     StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode 
runtimeExecutionMode) {
+        return buildGraph(sink, runtimeExecutionMode, true);
+    }
+
+    StreamGraph buildGraph(
+            SinkT sink, RuntimeExecutionMode runtimeExecutionMode, boolean 
setSinkParallelism) {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         final Configuration config = new Configuration();
@@ -203,16 +226,19 @@ public abstract class 
SinkTransformationTranslatorITCaseBase<SinkT> extends Test
         env.configure(config, getClass().getClassLoader());
         final DataStreamSource<Integer> src = env.fromElements(1, 2);
         final DataStreamSink<Integer> dataStreamSink = sinkTo(src.rebalance(), 
sink);
-        setSinkProperty(dataStreamSink);
+        setSinkProperty(dataStreamSink, setSinkParallelism);
         // Trigger the plan generation but do not clear the transformations
         env.getExecutionPlan();
         return env.getStreamGraph();
     }
 
-    private void setSinkProperty(DataStreamSink<Integer> dataStreamSink) {
+    private void setSinkProperty(
+            DataStreamSink<Integer> dataStreamSink, boolean 
setSinkParallelism) {
         dataStreamSink.name(NAME);
         dataStreamSink.uid(UID);
-        
dataStreamSink.setParallelism(SinkTransformationTranslatorITCaseBase.PARALLELISM);
+        if (setSinkParallelism) {
+            
dataStreamSink.setParallelism(SinkTransformationTranslatorITCaseBase.PARALLELISM);
+        }
         dataStreamSink.slotSharingGroup(SLOT_SHARE_GROUP);
     }
 

Reply via email to