[ 
https://issues.apache.org/jira/browse/FLINK-39481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-39481:
-----------------------------------
    Description: 
With {{execution.checkpointing.unaligned.interruptible-timers.enabled=true}}, 
watermark processing goes through {{MailboxWatermarkProcessor}}: the watermark 
is forwarded downstream only after the timer-firing chain completes, and when 
firing is interrupted the continuation is scheduled as a deferrable mail 
(FLINK-35528, since 1.20.0).

Deferrable mails are skipped by {{tryYield()}}, including the mailbox drain in 
{{StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator}}. When EndOfData 
arrives while such a continuation is pending, the operators finish and 
EndOfData is forwarded downstream before the watermark. Downstream operators 
then finish at the old watermark and silently discard the state of windows that 
only fire on that watermark. This is an exactly-once violation (data loss).

This surfaced as flakiness in {{WindowDistinctAggregateITCase}} (and 
FLINK-39930): the CI configuration randomization occasionally enables unaligned 
checkpoints with {{aligned-checkpoint-timeout=0}} plus interruptible timers, in 
which case the entire trailing window family (fired only by the end-of-input 
MAX_WATERMARK) goes missing. Analysis of all linked failing builds showed a 13% 
failure rate for this test given that configuration; the checkpoint+restore in 
the test is incidental, the loss reproduces without any failover. Full 
root-cause analysis, reproducer, and causal verification: see [this 
comment|https://issues.apache.org/jira/browse/FLINK-39481?focusedCommentId=18093085&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-18093085]
 and the follow-ups.



h2. Original Jira ticket content:

The test `WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets` is 
flaky 
and fails intermittently in CI due to a race condition in the test framework 
itself.

1. Failure Evidence

Azure Build #74194 (2026-04-17):
 - Run 1: PASS
 - Run 2: PASS
 - Run 3: FAIL ← flaky
 - Run 4: PASS

Error message:
org.opentest4j.AssertionFailedError:
Expected 23 rows but got 14 rows.

Missing rows (all related to window [2020-10-10T00:00:30, ...]):
 - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1
 - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1
 - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1
 - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0
 - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0
 - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0
 - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1
 - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2,11.10,7.0,3.0,1
 - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2,11.10,7.0,3.0,1

2. Root Cause Analysis

The test uses `FailingCollectionSource` with `'failing-source' = 'true'`, 
which intentionally throws an artificial exception to trigger 
checkpoint-restore path.
The source emits 11 rows with an event at timestamp `2020-10-10 00:00:32` and 
`2020-10-10 00:00:34` at the end of the dataset.

The flakiness stems from a race condition in `FailingCollectionSource`:

1. The source uses a checkpoint-triggered failure mechanism. When the failure 
occurs, only the rows emitted BEFORE the checkpoint are guaranteed to be 
replayed after restore.

2. The last 2 rows (timestamps 00:00:32 and 00:00:34) advance the watermark 
past 
`00:00:31`, triggering the cumulate windows ending at 00:00:35/40/45. 
If the failure and restore happen AFTER the source emits rows 
at 00:00:32/00:00:34 but BEFORE the downstream operators flush their state, 
those late-window results can be lost.

3. Specifically, the issue is that `FailingCollectionSource` uses a legacy 
`SourceFunction` API with `ctx.getCheckpointLock()`. The timing of when 
`numSuccessfulCheckpoints >= 1 && lastCheckpointedEmittedNum >= 1` 
is satisfied relative to when the last 2 rows are emitted is non-deterministic,
meaning sometimes the last 2 rows are emitted before the artificial failure is 
triggered (correct case), and sometimes after (data loss case).

 

3. Impact
 - Flink CI blocked intermittently on the `test_ci table` job
 - 3 out of 4 retry runs pass, demonstrating it's a classic flaky test pattern
 - Affects: WindowDistinctAggregateITCase (CUMULATE window with GROUPING 
SETS/CUBE/ROLLUP)

  was:
The test `WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets` is 
flaky 
and fails intermittently in CI due to a race condition in the test framework 
itself.

1. Failure Evidence

Azure Build #74194 (2026-04-17):
 - Run 1: PASS
 - Run 2: PASS
 - Run 3: FAIL ← flaky
 - Run 4: PASS

Error message:
org.opentest4j.AssertionFailedError:
Expected 23 rows but got 14 rows.

Missing rows (all related to window [2020-10-10T00:00:30, ...]):
 - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1
 - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1
 - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1
 - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0
 - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0
 - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0
 - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1
 - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2,11.10,7.0,3.0,1
 - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2,11.10,7.0,3.0,1

2. Root Cause Analysis

The test uses `FailingCollectionSource` with `'failing-source' = 'true'`, 
which intentionally throws an artificial exception to trigger 
checkpoint-restore path.
The source emits 11 rows with an event at timestamp `2020-10-10 00:00:32` and 
`2020-10-10 00:00:34` at the end of the dataset.

The flakiness stems from a race condition in `FailingCollectionSource`:

1. The source uses a checkpoint-triggered failure mechanism. When the failure 
occurs, only the rows emitted BEFORE the checkpoint are guaranteed to be 
replayed after restore.

2. The last 2 rows (timestamps 00:00:32 and 00:00:34) advance the watermark 
past 
`00:00:31`, triggering the cumulate windows ending at 00:00:35/40/45. 
If the failure and restore happen AFTER the source emits rows 
at 00:00:32/00:00:34 but BEFORE the downstream operators flush their state, 
those late-window results can be lost.

3. Specifically, the issue is that `FailingCollectionSource` uses a legacy 
`SourceFunction` API with `ctx.getCheckpointLock()`. The timing of when 
`numSuccessfulCheckpoints >= 1 && lastCheckpointedEmittedNum >= 1` 
is satisfied relative to when the last 2 rows are emitted is non-deterministic,
meaning sometimes the last 2 rows are emitted before the artificial failure is 
triggered (correct case), and sometimes after (data loss case).

 

3. Impact
 - Flink CI blocked intermittently on the `test_ci table` job
 - 3 out of 4 retry runs pass, demonstrating it's a classic flaky test pattern
 - Affects: WindowDistinctAggregateITCase (CUMULATE window with GROUPING 
SETS/CUBE/ROLLUP)


> Watermark deferred by interruptible timers is lost at end of input, dropping 
> window results
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-39481
>                 URL: https://issues.apache.org/jira/browse/FLINK-39481
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Task
>    Affects Versions: 2.0.2, 2.3.0, 2.2.1, 1.20.5, 2.1.3, 2.4.0
>            Reporter: featzhang
>            Assignee: Martijn Visser
>            Priority: Critical
>              Labels: pull-request-available, test-stability
>         Attachments: 
> WindowDistinctAggregateITCase.testCumulateWindow_Rollup_SplitDistinct-true_ROCKSDB.FAILED.log
>
>
> With {{execution.checkpointing.unaligned.interruptible-timers.enabled=true}}, 
> watermark processing goes through {{MailboxWatermarkProcessor}}: the 
> watermark is forwarded downstream only after the timer-firing chain 
> completes, and when firing is interrupted the continuation is scheduled as a 
> deferrable mail (FLINK-35528, since 1.20.0).
> Deferrable mails are skipped by {{tryYield()}}, including the mailbox drain 
> in {{StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator}}. When 
> EndOfData arrives while such a continuation is pending, the operators finish 
> and EndOfData is forwarded downstream before the watermark. Downstream 
> operators then finish at the old watermark and silently discard the state of 
> windows that only fire on that watermark. This is an exactly-once violation 
> (data loss).
> This surfaced as flakiness in {{WindowDistinctAggregateITCase}} (and 
> FLINK-39930): the CI configuration randomization occasionally enables 
> unaligned checkpoints with {{aligned-checkpoint-timeout=0}} plus 
> interruptible timers, in which case the entire trailing window family (fired 
> only by the end-of-input MAX_WATERMARK) goes missing. Analysis of all linked 
> failing builds showed a 13% failure rate for this test given that 
> configuration; the checkpoint+restore in the test is incidental, the loss 
> reproduces without any failover. Full root-cause analysis, reproducer, and 
> causal verification: see [this 
> comment|https://issues.apache.org/jira/browse/FLINK-39481?focusedCommentId=18093085&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-18093085]
>  and the follow-ups.
> h2. Original Jira ticket content:
> The test `WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets` is 
> flaky 
> and fails intermittently in CI due to a race condition in the test framework 
> itself.
> 1. Failure Evidence
> Azure Build #74194 (2026-04-17):
>  - Run 1: PASS
>  - Run 2: PASS
>  - Run 3: FAIL ← flaky
>  - Run 4: PASS
> Error message:
> org.opentest4j.AssertionFailedError:
> Expected 23 rows but got 14 rows.
> Missing rows (all related to window [2020-10-10T00:00:30, ...]):
>  - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1
>  - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1
>  - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1
>  - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0
>  - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0
>  - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0
>  - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1
>  - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2,11.10,7.0,3.0,1
>  - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2,11.10,7.0,3.0,1
> 2. Root Cause Analysis
> The test uses `FailingCollectionSource` with `'failing-source' = 'true'`, 
> which intentionally throws an artificial exception to trigger 
> checkpoint-restore path.
> The source emits 11 rows with an event at timestamp `2020-10-10 00:00:32` and 
> `2020-10-10 00:00:34` at the end of the dataset.
> The flakiness stems from a race condition in `FailingCollectionSource`:
> 1. The source uses a checkpoint-triggered failure mechanism. When the failure 
> occurs, only the rows emitted BEFORE the checkpoint are guaranteed to be 
> replayed after restore.
> 2. The last 2 rows (timestamps 00:00:32 and 00:00:34) advance the watermark 
> past 
> `00:00:31`, triggering the cumulate windows ending at 00:00:35/40/45. 
> If the failure and restore happen AFTER the source emits rows 
> at 00:00:32/00:00:34 but BEFORE the downstream operators flush their state, 
> those late-window results can be lost.
> 3. Specifically, the issue is that `FailingCollectionSource` uses a legacy 
> `SourceFunction` API with `ctx.getCheckpointLock()`. The timing of when 
> `numSuccessfulCheckpoints >= 1 && lastCheckpointedEmittedNum >= 1` 
> is satisfied relative to when the last 2 rows are emitted is 
> non-deterministic,
> meaning sometimes the last 2 rows are emitted before the artificial failure 
> is 
> triggered (correct case), and sometimes after (data loss case).
>  
> 3. Impact
>  - Flink CI blocked intermittently on the `test_ci table` job
>  - 3 out of 4 retry runs pass, demonstrating it's a classic flaky test pattern
>  - Affects: WindowDistinctAggregateITCase (CUMULATE window with GROUPING 
> SETS/CUBE/ROLLUP)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to