This is an automated email from the ASF dual-hosted git repository.
fanrui pushed a commit to branch release-2.0
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.0 by this push:
new 0c57fbc41c6 [FLINK-37870][checkpoint] Fix the bug that unaligned
checkpoint is disabled for all connections unexpectedly
0c57fbc41c6 is described below
commit 0c57fbc41c6bc556be31daa2bfb924450008ccf6
Author: Rui Fan <[email protected]>
AuthorDate: Tue Jun 3 14:06:24 2025 +0200
[FLINK-37870][checkpoint] Fix the bug that unaligned checkpoint is disabled
for all connections unexpectedly
---
.../translators/SinkTransformationTranslator.java | 4 +-
.../SinkV2TransformationTranslatorITCase.java | 7 ++-
.../flink/test/streaming/runtime/SinkV2ITCase.java | 50 ++++++++++++++++++++--
3 files changed, 55 insertions(+), 6 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
index 4dfdf2ca319..26510f5fff3 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/translators/SinkTransformationTranslator.java
@@ -223,7 +223,9 @@ public class SinkTransformationTranslator<Input, Output>
// check all transformation after the writer and recursively
disable UC for all inputs
// up to the writer
- Set<Integer> seen = new HashSet<>(writer.getId());
+ Set<Integer> seen = new HashSet<>(sinkTransformations.size() * 2);
+ seen.add(writer.getId());
+
Queue<Transformation<?>> pending =
new ArrayDeque<>(
sinkTransformations.subList(
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
index 8912281a080..6a1f418b216 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/SinkV2TransformationTranslatorITCase.java
@@ -57,10 +57,14 @@ class SinkV2TransformationTranslatorITCase {
static final String UID = "FileUid";
static final int PARALLELISM = 2;
- protected static void assertNoUnalignedOutput(StreamNode src) {
+ private static void assertNoUnalignedOutput(StreamNode src) {
assertThat(src.getOutEdges()).allMatch(e ->
!e.supportsUnalignedCheckpoints());
}
+ private static void assertUnalignedOutput(StreamNode src) {
+
assertThat(src.getOutEdges()).allMatch(StreamEdge::supportsUnalignedCheckpoints);
+ }
+
Sink<Integer> simpleSink() {
return TestSinkV2.<Integer>newBuilder().build();
}
@@ -181,6 +185,7 @@ class SinkV2TransformationTranslatorITCase {
assertThat(streamGraph.getStreamNodes()).hasSize(3);
assertNoUnalignedOutput(writerNode);
+ assertUnalignedOutput(sourceNode);
validateTopology(
writerNode,
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
index 88e89e3185e..9a62db2db4d 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkV2ITCase.java
@@ -25,8 +25,12 @@ import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.util.ratelimit.GatedRateLimiter;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiter;
import
org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
+import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.Record;
import
org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.RecordSerializer;
@@ -87,12 +91,15 @@ public class SinkV2ITCase extends AbstractTestBase {
final Source<Integer, ?, ?> source = createStreamingSource();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ // Introduce the keyBy to assert unaligned checkpoint is
enabled on the source ->
+ // sink writer edge
+ .keyBy((KeySelector<Integer, Integer>) value -> value)
.sinkTo(
TestSinkV2.<Integer>newBuilder()
.setCommitter(
new TrackingCommitter(committed),
RecordSerializer::new)
.build());
- env.execute();
+ executeAndVerifyStreamGraph(env);
assertThat(committed.get())
.extracting(Committer.CommitRequest::getCommittable)
.containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE);
@@ -106,13 +113,16 @@ public class SinkV2ITCase extends AbstractTestBase {
final Source<Integer, ?, ?> source = createStreamingSource();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ // Introduce the keyBy to assert unaligned checkpoint is
enabled on the source ->
+ // sink writer edge
+ .keyBy((KeySelector<Integer, Integer>) value -> value)
.sinkTo(
TestSinkV2.<Integer>newBuilder()
.setCommitter(
new TrackingCommitter(committed),
RecordSerializer::new)
.setWithPreCommitTopology(SinkV2ITCase::flipValue)
.build());
- env.execute();
+ executeAndVerifyStreamGraph(env);
assertThat(committed.get())
.extracting(Committer.CommitRequest::getCommittable)
.containsExactlyInAnyOrderElementsOf(
@@ -138,12 +148,15 @@ public class SinkV2ITCase extends AbstractTestBase {
IntegerTypeInfo.INT_TYPE_INFO);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ // Introduce the rebalance to assert unaligned checkpoint is
enabled on the source
+ // -> sink writer edge
+ .rebalance()
.sinkTo(
TestSinkV2.<Integer>newBuilder()
.setCommitter(
new TrackingCommitter(committed),
RecordSerializer::new)
.build());
- env.execute();
+ executeAndVerifyStreamGraph(env);
assertThat(committed.get())
.extracting(Committer.CommitRequest::getCommittable)
.containsExactlyInAnyOrderElementsOf(EXPECTED_COMMITTED_DATA_IN_BATCH_MODE);
@@ -162,13 +175,16 @@ public class SinkV2ITCase extends AbstractTestBase {
IntegerTypeInfo.INT_TYPE_INFO);
env.fromSource(source, WatermarkStrategy.noWatermarks(), "source")
+ // Introduce the rebalance to assert unaligned checkpoint is
enabled on the source
+ // -> sink writer edge
+ .rebalance()
.sinkTo(
TestSinkV2.<Integer>newBuilder()
.setCommitter(
new TrackingCommitter(committed),
RecordSerializer::new)
.setWithPreCommitTopology(SinkV2ITCase::flipValue)
.build());
- env.execute();
+ executeAndVerifyStreamGraph(env);
assertThat(committed.get())
.extracting(Committer.CommitRequest::getCommittable)
.containsExactlyInAnyOrderElementsOf(
@@ -190,6 +206,32 @@ public class SinkV2ITCase extends AbstractTestBase {
return env;
}
+ private void executeAndVerifyStreamGraph(StreamExecutionEnvironment env)
throws Exception {
+ StreamGraph streamGraph = env.getStreamGraph();
+ assertNoUnalignedCheckpointInSink(streamGraph);
+ assertUnalignedCheckpointInNonSink(streamGraph);
+ env.execute(streamGraph);
+ }
+
+ private void assertNoUnalignedCheckpointInSink(StreamGraph streamGraph) {
+ // all the out edges between sink nodes should not support unaligned
checkpoints
+ assertThat(streamGraph.getStreamNodes())
+ .filteredOn(t -> t.getOperatorName().contains("Sink"))
+ .flatMap(StreamNode::getOutEdges)
+ .allMatch(e -> !e.supportsUnalignedCheckpoints())
+ .isNotEmpty();
+ }
+
+ private void assertUnalignedCheckpointInNonSink(StreamGraph streamGraph) {
+ // All connections are rebalance between source and source, so all the
out edges of nodes
+ // upstream of the sink should support unaligned checkpoints
+ assertThat(streamGraph.getStreamNodes())
+ .filteredOn(t -> !t.getOperatorName().contains("Sink"))
+ .flatMap(StreamNode::getOutEdges)
+ .allMatch(StreamEdge::supportsUnalignedCheckpoints)
+ .isNotEmpty();
+ }
+
/**
* A stream source that: 1) emits a list of elements without allowing
checkpoints, 2) then waits
* for two more checkpoints to complete, 3) then re-emits the same
elements before 4) waiting