This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-1.20
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.20 by this push:
new 7fbe72488ce [FLINK-37870][checkpoint] Fix the bug that unaligned
checkpoint is disabled for all connections unexpectedly
7fbe72488ce is described below
commit 7fbe72488ce2f572354906ff713313b750c390db
Author: Rui Fan <[email protected]>
AuthorDate: Tue Jun 3 14:06:24 2025 +0200
[FLINK-37870][checkpoint] Fix the bug that unaligned checkpoint is disabled
for all connections unexpectedly
---
.../translators/SinkTransformationTranslator.java | 4 +-
.../SinkTransformationTranslatorITCaseBase.java | 5 +++
.../flink/test/streaming/runtime/SinkV2ITCase.java | 50 ++++++++++++++++++++--
3 files changed, 54 insertions(+), 5 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index dd41218a497..1110894a8be 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -225,7 +225,9 @@ public class SinkTransformationTranslator<Input, Output>
// check all transformation after the writer and recursively
disable UC for all inputs
// up to the writer
- Set<Integer> seen = new HashSet<>(writer.getId());
+ Set<Integer> seen = new HashSet<>(sinkTransformations.size() * 2);
+ seen.add(writer.getId());
+
Queue<Transformation<?>> pending =
new ArrayDeque<>(
sinkTransformations.subList(
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
index 5f976e2684c..f8a3f3141b7 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkTransformationTranslatorITCaseBase.java
@@ -117,6 +117,7 @@ abstract class
SinkTransformationTranslatorITCaseBase<SinkT> {
assertThat(streamGraph.getStreamNodes()).hasSize(3);
assertNoUnalignedOutput(writerNode);
+ assertUnalignedOutput(sourceNode);
validateTopology(
writerNode,
@@ -281,6 +282,10 @@ abstract class
SinkTransformationTranslatorITCaseBase<SinkT> {
assertThat(src.getOutEdges()).allMatch(e ->
!e.supportsUnalignedCheckpoints());
}
+ protected static void assertUnalignedOutput(StreamNode src) {
+
assertThat(src.getOutEdges()).allMatch(StreamEdge::supportsUnalignedCheckpoints);
+ }
+
StreamGraph buildGraph(SinkT sink, RuntimeExecutionMode
runtimeExecutionMode) {
return buildGraph(sink, runtimeExecutionMode, true);
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index 724a72e8b7f..1695694a04e 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -20,8 +20,12 @@ package org.apache.flink.test.streaming.runtime;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.connector.sink2.Committer;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.streaming.util.FiniteTestSource;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
@@ -88,6 +92,9 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA,
SOURCE_DATA);
env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+ // Introduce the keyBy to assert unaligned checkpoint is
enabled on the source ->
+ // sink writer edge
+ .keyBy((KeySelector<Integer, Integer>) value -> value)
.sinkTo(
TestSinkV2.<Integer>newBuilder()
.setDefaultCommitter(
@@ -95,7 +102,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
& Serializable)
() -> COMMIT_QUEUE)
.build());
- env.execute();
+ executeAndVerifyStreamGraph(env);
assertThat(
COMMIT_QUEUE.stream()
.map(Committer.CommitRequest::getCommittable)
@@ -110,6 +117,9 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
new FiniteTestSource<>(COMMIT_QUEUE_RECEIVE_ALL_DATA,
SOURCE_DATA);
env.addSource(source, IntegerTypeInfo.INT_TYPE_INFO)
+ // Introduce the keyBy to assert unaligned checkpoint is
enabled on the source ->
+ // sink writer edge
+ .keyBy((KeySelector<Integer, Integer>) value -> value)
.sinkTo(
TestSinkV2.<Integer>newBuilder()
.setDefaultCommitter(
@@ -118,7 +128,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
() -> COMMIT_QUEUE)
.setWithPreCommitTopology(true)
.build());
- env.execute();
+ executeAndVerifyStreamGraph(env);
assertThat(
COMMIT_QUEUE.stream()
.map(Committer.CommitRequest::getCommittable)
@@ -134,6 +144,9 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
final StreamExecutionEnvironment env = buildBatchEnv();
env.fromData(SOURCE_DATA)
+ // Introduce the rebalance to assert unaligned checkpoint is
enabled on the source
+ // -> sink writer edge
+ .rebalance()
.sinkTo(
TestSinkV2.<Integer>newBuilder()
.setDefaultCommitter(
@@ -141,7 +154,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
& Serializable)
() -> COMMIT_QUEUE)
.build());
- env.execute();
+ executeAndVerifyStreamGraph(env);
assertThat(
COMMIT_QUEUE.stream()
.map(Committer.CommitRequest::getCommittable)
@@ -154,6 +167,9 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
final StreamExecutionEnvironment env = buildBatchEnv();
env.fromData(SOURCE_DATA)
+ // Introduce the rebalance to assert unaligned checkpoint is
enabled on the source
+ // -> sink writer edge
+ .rebalance()
.sinkTo(
TestSinkV2.<Integer>newBuilder()
.setDefaultCommitter(
@@ -162,7 +178,7 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
() -> COMMIT_QUEUE)
.setWithPreCommitTopology(true)
.build());
- env.execute();
+ executeAndVerifyStreamGraph(env);
assertThat(
COMMIT_QUEUE.stream()
.map(Committer.CommitRequest::getCommittable)
@@ -185,4 +201,30 @@ public class SinkV2ITCase extends AbstractTestBaseJUnit4 {
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
return env;
}
+
+ private void executeAndVerifyStreamGraph(StreamExecutionEnvironment env)
throws Exception {
+ StreamGraph streamGraph = env.getStreamGraph();
+ assertNoUnalignedCheckpointInSink(streamGraph);
+ assertUnalignedCheckpointInNonSink(streamGraph);
+ env.execute(streamGraph);
+ }
+
+ private void assertNoUnalignedCheckpointInSink(StreamGraph streamGraph) {
+ // all the out edges between sink nodes should not support unaligned
checkpoints
+
org.assertj.core.api.Assertions.assertThat(streamGraph.getStreamNodes())
+ .filteredOn(t -> t.getOperatorName().contains("Sink"))
+ .flatMap(StreamNode::getOutEdges)
+ .allMatch(e -> !e.supportsUnalignedCheckpoints())
+ .isNotEmpty();
+ }
+
+ private void assertUnalignedCheckpointInNonSink(StreamGraph streamGraph) {
+ // All connections are rebalance between source and source, so all the
out edges of nodes
+ // upstream of the sink should support unaligned checkpoints
+
org.assertj.core.api.Assertions.assertThat(streamGraph.getStreamNodes())
+ .filteredOn(t -> !t.getOperatorName().contains("Sink"))
+ .flatMap(StreamNode::getOutEdges)
+ .allMatch(StreamEdge::supportsUnalignedCheckpoints)
+ .isNotEmpty();
+ }
}