This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.20 by this push:
     new 7fbe72488ce [FLINK-37870][checkpoint] Fix the bug that unaligned 
checkpoint is disabled for all connections unexpectedly
7fbe72488ce is described below

commit 7fbe72488ce2f572354906ff713313b750c390db
Author: Rui Fan <[email protected]>
AuthorDate: Tue Jun 3 14:06:24 2025 +0200

    [FLINK-37870][checkpoint] Fix the bug that unaligned checkpoint is disabled 
for all connections unexpectedly
---
 .../translators/SinkTransformationTranslator.java  |  4 +-
 .../SinkTransformationTranslatorITCaseBase.java    |  5 +++
 .../flink/test/streaming/runtime/SinkV2ITCase.java | 50 ++++++++++++++++++++--
 3 files changed, 54 insertions(+), 5 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index dd41218a497..1110894a8be 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -225,7 +225,9 @@ public class SinkTransformationTranslator<Input, Output>
 
             // check all transformation after the writer and recursively 
disable UC for all inputs
             // up to the writer
-            Set<Integer> seen = new HashSet<>(writer.getId());
+            Set<Integer> seen = new HashSet<>(sinkTransformations.size() * 2);
+            seen.add(writer.getId());
+
             Queue<Transformation<?>> pending =
                     new ArrayDeque<>(
                             sinkTransformations.subList(
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
index 5f976e2684c..f8a3f3141b7 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
@@ -117,6 +117,7 @@ abstract class 
SinkTransformationTranslatorITCaseBase<SinkT> {
 
         assertThat(streamGraph.getStreamNodes()).hasSize(3);
         assertNoUnalignedOutput(writerNode);
+        assertUnalignedOutput(sourceNode);
 
         validateTopology(
                 writerNode,
@@ -281,6 +282,10 @@ abstract class 
SinkTransformationTranslatorITCaseBase<SinkT> {
         assertThat(src.getOutEdges()).allMatch(e -> 
!e.supportsUnalignedCheckpoints());
     }
 
+    protected static void assertUnalignedOutput(StreamNode src) {
+        
assertThat(src.getOutEdges()).allMatch(StreamEdge::supportsUnalignedCheckpoints);
+    }
+
     StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode 
runtimeExecutionMode) {
         return buildGraph(sink, runtimeExecutionMode, true);
     }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index 724a72e8b7f..1695694a04e 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -20,8 +20,12 @@ package org.apache.flink.test.streaming.runtime;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
 import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
 import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
 import org.apache.flink.streaming.util.FiniteTestSource;
 import org.apache.flink.test.util.AbstractTestBaseJUnit4;
@@ -88,6 +92,9 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
                 new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, 
SOURCE_DATA);
 
         env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+                // Introduce the keyBy to assert unaligned checkpoint is 
enabled on the source ->
+                // sink writer edge
+                .keyBy((KeySelector<Integer, Integer>) value -> value)
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
                                 .setDefaultCommitter(
@@ -95,7 +102,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
                                                         & Serializable)
                                                 () -> COMMIT_QUEUE)
                                 .build());
-        env.execute();
+        executeAndVerifyStreamGraph(env);
         assertThat(
                 COMMIT_QUEUE.stream()
                         .map(Committer.CommitRequest::getCommittable)
@@ -110,6 +117,9 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
                 new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA, 
SOURCE_DATA);
 
         env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+                // Introduce the keyBy to assert unaligned checkpoint is 
enabled on the source ->
+                // sink writer edge
+                .keyBy((KeySelector<Integer, Integer>) value -> value)
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
                                 .setDefaultCommitter(
@@ -118,7 +128,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
                                                 () -> COMMIT_QUEUE)
                                 .setWithPreCommitTopology(true)
                                 .build());
-        env.execute();
+        executeAndVerifyStreamGraph(env);
         assertThat(
                 COMMIT_QUEUE.stream()
                         .map(Committer.CommitRequest::getCommittable)
@@ -134,6 +144,9 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
         env.fromData(SOURCE_DATA)
+                // Introduce the rebalance to assert unaligned checkpoint is 
enabled on the source
+                // -> sink writer edge
+                .rebalance()
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
                                 .setDefaultCommitter(
@@ -141,7 +154,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
                                                         & Serializable)
                                                 () -> COMMIT_QUEUE)
                                 .build());
-        env.execute();
+        executeAndVerifyStreamGraph(env);
         assertThat(
                 COMMIT_QUEUE.stream()
                         .map(Committer.CommitRequest::getCommittable)
@@ -154,6 +167,9 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
         final StreamExecutionEnvironment env = buildBatchEnv();
 
         env.fromData(SOURCE_DATA)
+                // Introduce the rebalance to assert unaligned checkpoint is 
enabled on the source
+                // -> sink writer edge
+                .rebalance()
                 .sinkTo(
                         TestSinkV2.<Integer>newBuilder()
                                 .setDefaultCommitter(
@@ -162,7 +178,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
                                                 () -> COMMIT_QUEUE)
                                 .setWithPreCommitTopology(true)
                                 .build());
-        env.execute();
+        executeAndVerifyStreamGraph(env);
         assertThat(
                 COMMIT_QUEUE.stream()
                         .map(Committer.CommitRequest::getCommittable)
@@ -185,4 +201,30 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
         return env;
     }
+
+    private void executeAndVerifyStreamGraph(StreamExecutionEnvironment env) 
throws Exception {
+        StreamGraph streamGraph = env.getStreamGraph();
+        assertNoUnalignedCheckpointInSink(streamGraph);
+        assertUnalignedCheckpointInNonSink(streamGraph);
+        env.execute(streamGraph);
+    }
+
+    private void assertNoUnalignedCheckpointInSink(StreamGraph streamGraph) {
+        // all the out edges between sink nodes should not support unaligned 
checkpoints
+        
org.assertj.core.api.Assertions.assertThat(streamGraph.getStreamNodes())
+                .filteredOn(t -> t.getOperatorName().contains("Sink"))
+                .flatMap(StreamNode::getOutEdges)
+                .allMatch(e -> !e.supportsUnalignedCheckpoints())
+                .isNotEmpty();
+    }
+
+    private void assertUnalignedCheckpointInNonSink(StreamGraph streamGraph) {
+        // All connections are rebalance between source and source, so all the 
out edges of nodes
+        // upstream of the sink should support unaligned checkpoints
+        
org.assertj.core.api.Assertions.assertThat(streamGraph.getStreamNodes())
+                .filteredOn(t -> !t.getOperatorName().contains("Sink"))
+                .flatMap(StreamNode::getOutEdges)
+                .allMatch(StreamEdge::supportsUnalignedCheckpoints)
+                .isNotEmpty();
+    }
 }

Reply via email to