[
https://issues.apache.org/jira/browse/FLINK-39388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yuepeng Pan resolved FLINK-39388.
---------------------------------
Fix Version/s: 2.4.0
Assignee: featzhang
Resolution: Fixed
Merged into master(2.4) via: f33b8211e8d0fe65186b1c6ea176d5b2399fe346
> Fix Flaky DataGeneratorSourceITCase#testGatedRateLimiter
> --------------------------------------------------------
>
> Key: FLINK-39388
> URL: https://issues.apache.org/jira/browse/FLINK-39388
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 2.0.2
> Reporter: featzhang
> Assignee: featzhang
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.4.0
>
>
> {{DataGeneratorSourceITCase#testGatedRateLimiter}} is a flaky test due to a
> race condition in the {{FirstCheckpointFilter}} inner class.
> ----
> h2. Description
> The integration test {{DataGeneratorSourceITCase#testGatedRateLimiter}} fails
> intermittently in CI (observed in build
> [#73815|https://dev.azure.com/apache-flink/98463496-1af2-4620-8eab-a2ecc1a2e6fe/_build/results?buildId=73815]
> triggered by PR [#27351|https://github.com/apache/flink/pull/27351]).
> h3. Root Cause
> The test uses an inner class {{FirstCheckpointFilter}} to collect only the
> elements emitted before the first checkpoint. In the original implementation,
> {{FirstCheckpointFilter}} stops collecting inside {{snapshotState()}}:
> {code:java}
> // BEFORE (buggy)
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
> firstCheckpoint = false; // stops collecting immediately on snapshot
> }
> {code}
> This creates a race condition:
> # {{GatedRateLimiter}} completes its first cycle immediately (by design) and
> emits {{capacityPerCheckpoint = 8}} elements without waiting for a checkpoint.
> # These 8 elements travel through the pipeline network to reach
> {{FirstCheckpointFilter}}.
> # If the checkpoint barrier arrives at {{FirstCheckpointFilter}} *before* all
> 8 elements have been processed, {{snapshotState()}} is called prematurely and
> {{firstCheckpoint}} is set to {{false}}.
> # Subsequent elements are dropped, so the final result contains fewer than 8
> elements.
> # The assertion {{assertThat(results).hasSize(capacityPerCheckpoint)}} fails.
> The failure is non-deterministic and depends on scheduling, network latency,
> and system load in the CI environment — classic symptoms of a flaky test.
> h3. Fix
> Replace the {{snapshotState}}-based cutoff with a
> {{notifyCheckpointComplete}}-based cutoff by implementing
> {{CheckpointListener}}. The {{notifyCheckpointComplete}} callback is invoked
> only after the checkpoint has been fully acknowledged by all operators, which
> guarantees that all elements emitted in the same checkpoint cycle have
> already been processed downstream.
> {code:java}
> // AFTER (fixed)
> @Override
> public void snapshotState(FunctionSnapshotContext context) throws Exception {
> // Record the ID of the first checkpoint so we can stop collecting when
> it completes.
> if (firstCheckpointId == Long.MIN_VALUE) {
> firstCheckpointId = context.getCheckpointId();
> }
> }
> @Override
> public void notifyCheckpointComplete(long checkpointId) throws Exception {
> // Stop collecting elements once the first checkpoint has completed.
> if (checkpointId >= firstCheckpointId && firstCheckpointId !=
> Long.MIN_VALUE) {
> firstCheckpointCompleted = true;
> }
> }
> {code}
> ----
> h2. Steps to Reproduce
> Run {{DataGeneratorSourceITCase#testGatedRateLimiter}} repeatedly under load
> or in a slow CI environment. The test will occasionally fail with:
> {noformat}
> org.opentest4j.AssertionFailedError:
> expected: a collection with size 8
> but was: a collection with size <N> (N < 8)
> {noformat}
> ----
> h2. Expected Behavior
> The test passes consistently regardless of checkpoint timing.
> ----
> h2. Actual Behavior
> The test fails intermittently when the checkpoint barrier reaches
> {{FirstCheckpointFilter}} before all upstream elements have been processed.
> ----
> h2. Environment
> - *Flink Version*: 2.0 (master)
> - *Java Version*: 17
> - *OS*: Linux (CI)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)