This is an automated email from the ASF dual-hosted git repository.

RocMarshal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f33b8211e8d [FLINK-39388][tests] Fix flaky 
DataGeneratorSourceITCase#testGatedRateLimiter (#27883)
f33b8211e8d is described below

commit f33b8211e8d0fe65186b1c6ea176d5b2399fe346
Author: Feat Zhang <[email protected]>
AuthorDate: Sat Apr 18 14:09:48 2026 +0800

    [FLINK-39388][tests] Fix flaky 
DataGeneratorSourceITCase#testGatedRateLimiter (#27883)
    
    Co-authored-by: Yuepeng Pan <[email protected]>
---
 .../datagen/source/DataGeneratorSourceITCase.java  | 32 +++++++++++++++++++---
 1 file changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
 
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
index 2aa1468caea..37aa4e40cff 100644
--- 
a/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
+++ 
b/flink-connectors/flink-connector-datagen-test/src/test/java/org/apache/flink/connector/datagen/source/DataGeneratorSourceITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.connector.datagen.source;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
@@ -254,21 +255,44 @@ class DataGeneratorSourceITCase extends TestLogger {
         }
     }
 
+    /**
+     * A filter that only passes through elements received before the first 
checkpoint completes.
+     *
+     * <p>The filter stops collecting elements in {@link 
#notifyCheckpointComplete(long)} rather
+     * than in {@link #snapshotState(FunctionSnapshotContext)}, to avoid a 
race condition where the
+     * checkpoint barrier arrives at this operator before all upstream 
elements (emitted in the same
+     * checkpoint cycle) have been processed. Using {@code 
notifyCheckpointComplete} ensures that
+     * the checkpoint has fully propagated through the pipeline before we stop 
collecting.
+     */
     private static class FirstCheckpointFilter
-            implements FlatMapFunction<Long, Long>, CheckpointedFunction {
+            implements FlatMapFunction<Long, Long>, CheckpointedFunction, 
CheckpointListener {
 
-        private volatile boolean firstCheckpoint = true;
+        private volatile boolean firstCheckpointCompleted = false;
+        private long firstCheckpointId = Long.MIN_VALUE;
 
         @Override
         public void flatMap(Long value, Collector<Long> out) throws Exception {
-            if (firstCheckpoint) {
+            if (!firstCheckpointCompleted) {
                 out.collect(value);
             }
         }
 
         @Override
         public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
-            firstCheckpoint = false;
+            // Record the ID of the first checkpoint so we can stop collecting 
when it completes.
+            if (firstCheckpointId == Long.MIN_VALUE) {
+                firstCheckpointId = context.getCheckpointId();
+            }
+        }
+
+        @Override
+        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
+            // Stop collecting elements once the first checkpoint has 
completed.
+            if (!firstCheckpointCompleted
+                    && checkpointId >= firstCheckpointId
+                    && firstCheckpointId != Long.MIN_VALUE) {
+                firstCheckpointCompleted = true;
+            }
         }
 
         @Override

Reply via email to