[
https://issues.apache.org/jira/browse/FLINK-39620?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
haiqingchen updated FLINK-39620:
--------------------------------
Description:
After a task restart / checkpoint recovery, a CUMULATE window with local-global
aggregation may re-emit already-emitted historical windows (a.k.a. "backfill").
Root cause is a dual watermark inconsistency during the recovery warm-up
window: SlicingWindowOperator.currentWatermark is restored from state, but
InternalTimerService.currentWatermark is still Long.MIN_VALUE until the first
post-recovery watermark arrives. During this warm-up gap, GlobalAggCombiner
uses the stale timer-service watermark to decide whether to register a window
timer, incorrectly treats historical windows as "not fired", re-registers old
timers, and once the real watermark advances those timers fire immediately and
cause a chain of fireWindow -> nextTriggerWindow for cumulative windows.
*Steps to Reproduce*
Run a CUMULATE window aggregation with local-global aggregate, e.g.:
{code:java}
SELECT window_start, window_end, COUNT(*)
FROM TABLE(
CUMULATE(TABLE T, DESCRIPTOR(rowtime), INTERVAL '1' MINUTES, INTERVAL '1'
DAY))
GROUP BY window_start, window_end;
{code}
Let some cumulative windows fire normally.
Take a savepoint / trigger a task restart.
After recovery, before the first watermark arrives, feed a late record that
belongs to an already-fired slice but within the max window (late-but-keep
path).
Then push a watermark forward.
An old window [window_start, oldEnd) is emitted again after recovery.
Multiple subsequent cumulative windows may also be re-emitted due to
nextTriggerWindow chaining.
*Why it was not caught by FLINK-24501*
FLINK-24501 (and PR #17509) added SlicingWindowOperator-level watermark
persistence and is-late protection for processElement. However the
timer-registration path inside GlobalAggCombiner still reads the timer-service
watermark, which is not covered by that fix during the warm-up window.
was:After a task restart / checkpoint recovery, a CUMULATE window with
local-global aggregation may re-emit already-emitted historical windows (a.k.a.
"backfill"). Root cause is a dual watermark inconsistency during the recovery
warm-up window: SlicingWindowOperator.currentWatermark is restored from state,
but InternalTimerService.currentWatermark is still Long.MIN_VALUE until the
first post-recovery watermark arrives. During this warm-up gap,
GlobalAggCombiner uses the stale timer-service watermark to decide whether to
register a window timer, incorrectly treats historical windows as "not fired",
re-registers old timers, and once the real watermark advances those timers fire
immediately and cause a chain of fireWindow -> nextTriggerWindow for cumulative
windows.
> Cumulative window re-emits old windows after recovery due to stale
> InternalTimerService.currentWatermark in GlobalAggCombiner
> -----------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39620
> URL: https://issues.apache.org/jira/browse/FLINK-39620
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 2.1.1, 1.20.4
> Environment: Flink 1.18.x (reproduced locally);
> logic is the same in 1.20.x and master.
> SQL pattern: CUMULATE with local-global two-phase aggregation.
> Event-time with shiftTimeZone = Asia/Shanghai.
> Reporter: haiqingchen
> Priority: Critical
>
> After a task restart / checkpoint recovery, a CUMULATE window with
> local-global aggregation may re-emit already-emitted historical windows
> (a.k.a. "backfill"). Root cause is a dual watermark inconsistency during the
> recovery warm-up window: SlicingWindowOperator.currentWatermark is restored
> from state, but InternalTimerService.currentWatermark is still Long.MIN_VALUE
> until the first post-recovery watermark arrives. During this warm-up gap,
> GlobalAggCombiner uses the stale timer-service watermark to decide whether to
> register a window timer, incorrectly treats historical windows as "not
> fired", re-registers old timers, and once the real watermark advances those
> timers fire immediately and cause a chain of fireWindow -> nextTriggerWindow
> for cumulative windows.
> *Steps to Reproduce*
> Run a CUMULATE window aggregation with local-global aggregate, e.g.:
> {code:java}
> SELECT window_start, window_end, COUNT(*)
> FROM TABLE(
> CUMULATE(TABLE T, DESCRIPTOR(rowtime), INTERVAL '1' MINUTES, INTERVAL '1'
> DAY))
> GROUP BY window_start, window_end;
> {code}
> Let some cumulative windows fire normally.
> Take a savepoint / trigger a task restart.
> After recovery, before the first watermark arrives, feed a late record that
> belongs to an already-fired slice but within the max window (late-but-keep
> path).
> Then push a watermark forward.
> An old window [window_start, oldEnd) is emitted again after recovery.
> Multiple subsequent cumulative windows may also be re-emitted due to
> nextTriggerWindow chaining.
> *Why it was not caught by FLINK-24501*
> FLINK-24501 (and PR #17509) added SlicingWindowOperator-level watermark
> persistence and is-late protection for processElement. However the
> timer-registration path inside GlobalAggCombiner still reads the
> timer-service watermark, which is not covered by that fix during the warm-up
> window.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)