[ 
https://issues.apache.org/jira/browse/FLINK-39388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuepeng Pan resolved FLINK-39388.
---------------------------------
    Fix Version/s: 2.4.0
         Assignee: featzhang
       Resolution: Fixed

Merged into master(2.4) via: f33b8211e8d0fe65186b1c6ea176d5b2399fe346

> Fix Flaky DataGeneratorSourceITCase#testGatedRateLimiter
> --------------------------------------------------------
>
>                 Key: FLINK-39388
>                 URL: https://issues.apache.org/jira/browse/FLINK-39388
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 2.0.2
>            Reporter: featzhang
>            Assignee: featzhang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 2.4.0
>
>
> {{DataGeneratorSourceITCase#testGatedRateLimiter}} is a flaky test due to a 
> race condition in the {{FirstCheckpointFilter}} inner class.
> ----
> h2. Description
> The integration test {{DataGeneratorSourceITCase#testGatedRateLimiter}} fails 
> intermittently in CI (observed in build 
> [#73815|https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=73815]
>  triggered by PR [#27351|https://github.com/apache/flink/pull/27351]).
> h3. Root Cause
> The test uses an inner class {{FirstCheckpointFilter}} to collect only the 
> elements emitted before the first checkpoint. In the original implementation, 
> {{FirstCheckpointFilter}} stops collecting inside {{snapshotState()}}:
> {code:java}
> // BEFORE (buggy)
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>     firstCheckpoint = false;   // stops collecting immediately on snapshot
> }
> {code}
> This creates a race condition:
> # {{GatedRateLimiter}} completes its first cycle immediately (by design) and 
> emits {{capacityPerCheckpoint = 8}} elements without waiting for a checkpoint.
> # These 8 elements travel through the pipeline network to reach 
> {{FirstCheckpointFilter}}.
> # If the checkpoint barrier arrives at {{FirstCheckpointFilter}} *before* all 
> 8 elements have been processed, {{snapshotState()}} is called prematurely and 
> {{firstCheckpoint}} is set to {{false}}.
> # Subsequent elements are dropped, so the final result contains fewer than 8 
> elements.
> # The assertion {{assertThat(results).hasSize(capacityPerCheckpoint)}} fails.
> The failure is non-deterministic and depends on scheduling, network latency, 
> and system load in the CI environment — classic symptoms of a flaky test.
> h3. Fix
> Replace the {{snapshotState}}-based cutoff with a 
> {{notifyCheckpointComplete}}-based cutoff by implementing 
> {{CheckpointListener}}. The {{notifyCheckpointComplete}} callback is invoked 
> only after the checkpoint has been fully acknowledged by all operators, which 
> guarantees that all elements emitted in the same checkpoint cycle have 
> already been processed downstream.
> {code:java}
> // AFTER (fixed)
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
>     // Record the ID of the first checkpoint so we can stop collecting when 
> it completes.
>     if (firstCheckpointId == Long.MIN_VALUE) {
>         firstCheckpointId = context.getCheckpointId();
>     }
> }
> @Override
> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>     // Stop collecting elements once the first checkpoint has completed.
>     if (checkpointId >= firstCheckpointId && firstCheckpointId != 
> Long.MIN_VALUE) {
>         firstCheckpointCompleted = true;
>     }
> }
> {code}
> ----
> h2. Steps to Reproduce
> Run {{DataGeneratorSourceITCase#testGatedRateLimiter}} repeatedly under load 
> or in a slow CI environment. The test will occasionally fail with:
> {noformat}
> org.opentest4j.AssertionFailedError:
> expected: a collection with size 8
>  but was: a collection with size <N> (N < 8)
> {noformat}
> ----
> h2. Expected Behavior
> The test passes consistently regardless of checkpoint timing.
> ----
> h2. Actual Behavior
> The test fails intermittently when the checkpoint barrier reaches 
> {{FirstCheckpointFilter}} before all upstream elements have been processed.
> ----
> h2. Environment
> - *Flink Version*: 2.0 (master)
> - *Java Version*: 17
> - *OS*: Linux (CI)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to