This is an automated email from the ASF dual-hosted git repository.
RocMarshal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f33b8211e8d [FLINK-39388][tests] Fix flaky
DataGeneratorSourceITCase#testGatedRateLimiter (#27883)
f33b8211e8d is described below
commit f33b8211e8d0fe65186b1c6ea176d5b2399fe346
Author: Feat Zhang <[email protected]>
AuthorDate: Sat Apr 18 14:09:48 2026 +0800
[FLINK-39388][tests] Fix flaky
DataGeneratorSourceITCase#testGatedRateLimiter (#27883)
Co-authored-by: Yuepeng Pan <[email protected]>
---
.../datagen/source/DataGeneratorSourceITCase.java | 32 +++++++++++++++++++---
1 file changed, 28 insertions(+), 4 deletions(-)
diff --git
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
index 2aa1468caea..37aa4e40cff 100644
---
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
+++
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.datagen.source;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
@@ -254,21 +255,44 @@ class DataGeneratorSourceITCase extends TestLogger {
}
}
+ /**
+ * A filter that only passes through elements received before the first
checkpoint completes.
+ *
+ * <p>The filter stops collecting elements in {@link
#notifyCheckpointComplete(long)} rather
+ * than in {@link #snapshotState(FunctionSnapshotContext)}, to avoid a
race condition where the
+ * checkpoint barrier arrives at this operator before all upstream
elements (emitted in the same
+ * checkpoint cycle) have been processed. Using {@code
notifyCheckpointComplete} ensures that
+ * the checkpoint has fully propagated through the pipeline before we stop
collecting.
+ */
private static class FirstCheckpointFilter
- implements FlatMapFunction<Long, Long>, CheckpointedFunction {
+ implements FlatMapFunction<Long, Long>, CheckpointedFunction,
CheckpointListener {
- private volatile boolean firstCheckpoint = true;
+ private volatile boolean firstCheckpointCompleted = false;
+ private long firstCheckpointId = Long.MIN_VALUE;
@Override
public void flatMap(Long value, Collector<Long> out) throws Exception {
- if (firstCheckpoint) {
+ if (!firstCheckpointCompleted) {
out.collect(value);
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws
Exception {
- firstCheckpoint = false;
+ // Record the ID of the first checkpoint so we can stop collecting
when it completes.
+ if (firstCheckpointId == Long.MIN_VALUE) {
+ firstCheckpointId = context.getCheckpointId();
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws
Exception {
+ // Stop collecting elements once the first checkpoint has
completed.
+ if (!firstCheckpointCompleted
+ && checkpointId >= firstCheckpointId
+ && firstCheckpointId != Long.MIN_VALUE) {
+ firstCheckpointCompleted = true;
+ }
}
@Override