[
https://issues.apache.org/jira/browse/FLINK-39481?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Martijn Visser updated FLINK-39481:
-----------------------------------
Description:
With {{execution.checkpointing.unaligned.interruptible-timers.enabled=true}},
watermark processing goes through {{MailboxWatermarkProcessor}}: the watermark
is forwarded downstream only after the timer-firing chain completes, and when
firing is interrupted the continuation is scheduled as a deferrable mail
(FLINK-35528, since 1.20.0).
Deferrable mails are skipped by {{tryYield()}}, including the mailbox drain in
{{StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator}}. When EndOfData
arrives while such a continuation is pending, the operators finish and
EndOfData is forwarded downstream before the watermark. Downstream operators
then finish at the old watermark and silently discard the state of windows that
only fire on that watermark. This is an exactly-once violation (data loss).
This surfaced as flakiness in {{WindowDistinctAggregateITCase}} (and
FLINK-39930): the CI configuration randomization occasionally enables unaligned
checkpoints with {{aligned-checkpoint-timeout=0}} plus interruptible timers, in
which case the entire trailing window family (fired only by the end-of-input
MAX_WATERMARK) goes missing. Analysis of all linked failing builds showed a 13%
failure rate for this test given that configuration; the checkpoint+restore in
the test is incidental, the loss reproduces without any failover. Full
root-cause analysis, reproducer, and causal verification: see [this
comment|https://issues.apache.org/jira/browse/FLINK-39481?focusedCommentId=18093085&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-18093085]
and the follow-ups.
h2. Original Jira ticket content:
The test `WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets` is
flaky
and fails intermittently in CI due to a race condition in the test framework
itself.
1. Failure Evidence
Azure Build #74194 (2026-04-17):
- Run 1: PASS
- Run 2: PASS
- Run 3: FAIL ← flaky
- Run 4: PASS
Error message:
org.opentest4j.AssertionFailedError:
Expected 23 rows but got 14 rows.
Missing rows (all related to window [2020-10-10T00:00:30, ...]):
- 0,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1
- 0,b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1
- 0,b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1
- 0,null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0
- 0,null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0
- 0,null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0
- 1,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1
- 1,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2,11.10,7.0,3.0,1
- 1,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2,11.10,7.0,3.0,1
2. Root Cause Analysis
The test uses `FailingCollectionSource` with `'failing-source' = 'true'`,
which intentionally throws an artificial exception to trigger
checkpoint-restore path.
The source emits 11 rows with an event at timestamp `2020-10-10 00:00:32` and
`2020-10-10 00:00:34` at the end of the dataset.
The flakiness stems from a race condition in `FailingCollectionSource`:
1. The source uses a checkpoint-triggered failure mechanism. When the failure
occurs, only the rows emitted BEFORE the checkpoint are guaranteed to be
replayed after restore.
2. The last 2 rows (timestamps 00:00:32 and 00:00:34) advance the watermark
past
`00:00:31`, triggering the cumulate windows ending at 00:00:35/40/45.
If the failure and restore happen AFTER the source emits rows
at 00:00:32/00:00:34 but BEFORE the downstream operators flush their state,
those late-window results can be lost.
3. Specifically, the issue is that `FailingCollectionSource` uses a legacy
`SourceFunction` API with `ctx.getCheckpointLock()`. The timing of when
`numSuccessfulCheckpoints >= 1 && lastCheckpointedEmittedNum >= 1`
is satisfied relative to when the last 2 rows are emitted is non-deterministic,
meaning sometimes the last 2 rows are emitted before the artificial failure is
triggered (correct case), and sometimes after (data loss case).
3. Impact
- Flink CI blocked intermittently on the `test_ci table` job
- 3 out of 4 retry runs pass, demonstrating it's a classic flaky test pattern
- Affects: WindowDistinctAggregateITCase (CUMULATE window with GROUPING
SETS/CUBE/ROLLUP)
was:
The test `WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets` is
flaky
and fails intermittently in CI due to a race condition in the test framework
itself.
1. Failure Evidence
Azure Build #74194 (2026-04-17):
- Run 1: PASS
- Run 2: PASS
- Run 3: FAIL ← flaky
- Run 4: PASS
Error message:
org.opentest4j.AssertionFailedError:
Expected 23 rows but got 14 rows.
Missing rows (all related to window [2020-10-10T00:00:30, ...]):
- 0,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1
- 0,b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1
- 0,b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1
- 0,null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0
- 0,null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0
- 0,null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0
- 1,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1
- 1,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2,11.10,7.0,3.0,1
- 1,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2,11.10,7.0,3.0,1
2. Root Cause Analysis
The test uses `FailingCollectionSource` with `'failing-source' = 'true'`,
which intentionally throws an artificial exception to trigger
checkpoint-restore path.
The source emits 11 rows with an event at timestamp `2020-10-10 00:00:32` and
`2020-10-10 00:00:34` at the end of the dataset.
The flakiness stems from a race condition in `FailingCollectionSource`:
1. The source uses a checkpoint-triggered failure mechanism. When the failure
occurs, only the rows emitted BEFORE the checkpoint are guaranteed to be
replayed after restore.
2. The last 2 rows (timestamps 00:00:32 and 00:00:34) advance the watermark
past
`00:00:31`, triggering the cumulate windows ending at 00:00:35/40/45.
If the failure and restore happen AFTER the source emits rows
at 00:00:32/00:00:34 but BEFORE the downstream operators flush their state,
those late-window results can be lost.
3. Specifically, the issue is that `FailingCollectionSource` uses a legacy
`SourceFunction` API with `ctx.getCheckpointLock()`. The timing of when
`numSuccessfulCheckpoints >= 1 && lastCheckpointedEmittedNum >= 1`
is satisfied relative to when the last 2 rows are emitted is non-deterministic,
meaning sometimes the last 2 rows are emitted before the artificial failure is
triggered (correct case), and sometimes after (data loss case).
3. Impact
- Flink CI blocked intermittently on the `test_ci table` job
- 3 out of 4 retry runs pass, demonstrating it's a classic flaky test pattern
- Affects: WindowDistinctAggregateITCase (CUMULATE window with GROUPING
SETS/CUBE/ROLLUP)
> Watermark deferred by interruptible timers is lost at end of input, dropping
> window results
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-39481
> URL: https://issues.apache.org/jira/browse/FLINK-39481
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Task
> Affects Versions: 2.0.2, 2.3.0, 2.2.1, 1.20.5, 2.1.3, 2.4.0
> Reporter: featzhang
> Assignee: Martijn Visser
> Priority: Critical
> Labels: pull-request-available, test-stability
> Attachments:
> WindowDistinctAggregateITCase.testCumulateWindow_Rollup_SplitDistinct-true_ROCKSDB.FAILED.log
>
>
> With {{execution.checkpointing.unaligned.interruptible-timers.enabled=true}},
> watermark processing goes through {{MailboxWatermarkProcessor}}: the
> watermark is forwarded downstream only after the timer-firing chain
> completes, and when firing is interrupted the continuation is scheduled as a
> deferrable mail (FLINK-35528, since 1.20.0).
> Deferrable mails are skipped by {{tryYield()}}, including the mailbox drain
> in {{StreamOperatorWrapper#quiesceTimeServiceAndFinishOperator}}. When
> EndOfData arrives while such a continuation is pending, the operators finish
> and EndOfData is forwarded downstream before the watermark. Downstream
> operators then finish at the old watermark and silently discard the state of
> windows that only fire on that watermark. This is an exactly-once violation
> (data loss).
> This surfaced as flakiness in {{WindowDistinctAggregateITCase}} (and
> FLINK-39930): the CI configuration randomization occasionally enables
> unaligned checkpoints with {{aligned-checkpoint-timeout=0}} plus
> interruptible timers, in which case the entire trailing window family (fired
> only by the end-of-input MAX_WATERMARK) goes missing. Analysis of all linked
> failing builds showed a 13% failure rate for this test given that
> configuration; the checkpoint+restore in the test is incidental, the loss
> reproduces without any failover. Full root-cause analysis, reproducer, and
> causal verification: see [this
> comment|https://issues.apache.org/jira/browse/FLINK-39481?focusedCommentId=18093085&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-18093085]
> and the follow-ups.
> h2. Original Jira ticket content:
> The test `WindowDistinctAggregateITCase#testCumulateWindow_GroupingSets` is
> flaky
> and fails intermittently in CI due to a race condition in the test framework
> itself.
> 1. Failure Evidence
> Azure Build #74194 (2026-04-17):
> - Run 1: PASS
> - Run 2: PASS
> - Run 3: FAIL ← flaky
> - Run 4: PASS
> Error message:
> org.opentest4j.AssertionFailedError:
> Expected 23 rows but got 14 rows.
> Missing rows (all related to window [2020-10-10T00:00:30, ...]):
> - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:35,1,3.33,3.0,3.0,1
> - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:40,1,3.33,3.0,3.0,1
> - 0,b,2020-10-10T00:00:30,2020-10-10T00:00:45,1,3.33,3.0,3.0,1
> - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:35,1,7.77,7.0,7.0,0
> - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:40,1,7.77,7.0,7.0,0
> - 0,null,2020-10-10T00:00:30,2020-10-10T00:00:45,1,7.77,7.0,7.0,0
> - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:35,2,11.10,7.0,3.0,1
> - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:40,2,11.10,7.0,3.0,1
> - 1,null,2020-10-10T00:00:30,2020-10-10T00:00:45,2,11.10,7.0,3.0,1
> 2. Root Cause Analysis
> The test uses `FailingCollectionSource` with `'failing-source' = 'true'`,
> which intentionally throws an artificial exception to trigger
> checkpoint-restore path.
> The source emits 11 rows with an event at timestamp `2020-10-10 00:00:32` and
> `2020-10-10 00:00:34` at the end of the dataset.
> The flakiness stems from a race condition in `FailingCollectionSource`:
> 1. The source uses a checkpoint-triggered failure mechanism. When the failure
> occurs, only the rows emitted BEFORE the checkpoint are guaranteed to be
> replayed after restore.
> 2. The last 2 rows (timestamps 00:00:32 and 00:00:34) advance the watermark
> past
> `00:00:31`, triggering the cumulate windows ending at 00:00:35/40/45.
> If the failure and restore happen AFTER the source emits rows
> at 00:00:32/00:00:34 but BEFORE the downstream operators flush their state,
> those late-window results can be lost.
> 3. Specifically, the issue is that `FailingCollectionSource` uses a legacy
> `SourceFunction` API with `ctx.getCheckpointLock()`. The timing of when
> `numSuccessfulCheckpoints >= 1 && lastCheckpointedEmittedNum >= 1`
> is satisfied relative to when the last 2 rows are emitted is
> non-deterministic,
> meaning sometimes the last 2 rows are emitted before the artificial failure
> is
> triggered (correct case), and sometimes after (data loss case).
>
> 3. Impact
> - Flink CI blocked intermittently on the `test_ci table` job
> - 3 out of 4 retry runs pass, demonstrating it's a classic flaky test pattern
> - Affects: WindowDistinctAggregateITCase (CUMULATE window with GROUPING
> SETS/CUBE/ROLLUP)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)