[
https://issues.apache.org/jira/browse/FLINK-39586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39586:
-----------------------------------
Labels: pull-request-available (was: )
> CombinedWatermarkStatus.updateCombinedWatermark() fails to mark idle when
> partialWatermarks is empty
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-39586
> URL: https://issues.apache.org/jira/browse/FLINK-39586
> Project: Flink
> Issue Type: Bug
> Reporter: Chen Zhang
> Priority: Minor
> Labels: pull-request-available
>
> h2. Summary
> {{CombinedWatermarkStatus.updateCombinedWatermark()}} short-circuits with
> {{return false}} when {{partialWatermarks}} is empty, without setting
> {{this.idle = true}}. If any per-split output previously emitted a watermark
> (setting {{idle = false}}), the idle flag becomes permanently stuck at
> {{false}} after all per-split outputs are unregistered. This prevents
> downstream watermark idle propagation.
> h2. Affected Versions
> Flink 1.x (all current 1.x releases). FLINK-38454 (fixed in 2.2.0) addresses
> a related but *different* case — where outputs remain registered but go idle.
> It does *not* fix the case where all outputs are *unregistered* (empty
> {{partialWatermarks}}).
> h2. Steps to Reproduce
> This occurs with {{HybridSource}} (bounded → unbounded) using per-split
> watermark generation:
> # Start a {{HybridSource}} with sourceIndex=0 (bounded, e.g. Iceberg) and
> sourceIndex=1 (unbounded, e.g. Kafka), with a {{WatermarkStrategy}} that uses
> per-split outputs.
> # Take a checkpoint while some subtasks still have active bounded-source
> splits. During processing, {{WatermarkGenerator.onEvent()}} calls
> {{output.emitWatermark()}}, which sets {{CombinedWatermarkStatus.idle =
> false}} on those subtasks' {{splitLocalOutput}}.
> # Restore from that checkpoint. Subtasks re-process remaining bounded splits
> and then finish the bounded phase.
> # Bounded splits complete. {{SourceReaderBase.releaseOutputForSplit()}}
> removes all {{PartialWatermark}} entries from the
> {{WatermarkOutputMultiplexer}}, making {{partialWatermarks}} empty.
> # Unbounded phase starts. Some subtasks (those with no partition assigned)
> have no splits and should become idle. But they never do.
> h2. Root Cause
> The bug is in {{CombinedWatermarkStatus.updateCombinedWatermark()}}:
> {code:java}
> if (partialWatermarks.isEmpty()) {
> return false; // BUG: returns without setting this.idle = true
> }
> {code}
> When all per-split outputs are released (unregistered), {{partialWatermarks}}
> becomes empty. The method returns {{false}} immediately, leaving the {{idle}}
> field at whatever value it was previously set to. If any watermark was
> emitted during the bounded phase, {{idle}} was set to {{false}} and remains
> {{false}} permanently.
> h3. Consequence Chain
> # {{CombinedWatermarkStatus.isIdle()}} returns *false* (stuck)
> # {{WatermarkOutputMultiplexer}} never calls {{splitLocalOutput.markIdle()}}
> # {{splitLocalOutput.isIdle}} stays *false* (was set to false by
> {{emitWatermark()}} during the bounded phase)
> #
> {{ProgressiveTimestampsAndWatermarks.IdlenessManager.maybeMarkUnderlyingOutputAsIdle()}}
> requires *both* {{splitLocalOutput.isIdle}} *and* {{mainOutput.isIdle}} to
> be true — the check fails
> # {{WatermarkToDataOutput.markIdle()}} is never called — no
> {{WatermarkStatus.IDLE}} sent downstream
> # Downstream {{StatusWatermarkValve}} waits indefinitely for watermark
> advancement from those subtasks
> # *Global watermark stalls* at the last bounded-source-era watermark value
> h2. Evidence
> * *Observed behavior*: After task-level recovery from a checkpoint taken
> during the bounded phase, watermark progression stalls permanently once all
> subtasks switch to the unbounded source. Watermark stays stuck at
> approximately {{switchPoint - maxOutOfOrderness}}.
> * *JM restart "fixes" it*: A JobManager restart loads a later checkpoint
> (taken after the source switch). Subtasks restore with empty reader state —
> no bounded-source per-split outputs are created — {{CombinedWatermarkStatus}}
> never has {{idle}} set to {{false}} by {{emitWatermark()}} —
> {{splitLocalOutput.isIdle}} stays at its initial {{true}} — idle propagation
> works correctly. This confirms the root cause is stale {{idle = false}} state
> carried over from the bounded phase.
> h2. Distinction from Related Issues
> ||Scenario||{{partialWatermarks}}||{{idle}} before||Bug triggered?||
> |FLINK-38477 (never had splits)|empty|{{true}} (initial)|No — {{idle}} is
> already correct|
> |FLINK-38454 (splits go idle but stay registered)|non-empty, all
> idle|{{false}}|Yes — fixed in 2.2.0|
> |*This bug* (splits unregistered after being active)|*empty* (was
> non-empty)|*{{false}}* (set by {{emitWatermark}})|*Yes — not fixed by either*|
> h2. Expected Behavior
> When {{partialWatermarks}} becomes empty (all per-split outputs
> unregistered), {{CombinedWatermarkStatus.updateCombinedWatermark()}} should
> set {{this.idle = true}} before returning. An empty set of outputs is
> semantically equivalent to "all outputs are idle."
> h2. Proposed Fix
> {code:java}
> if (partialWatermarks.isEmpty()) {
> this.idle = true; // No outputs registered - treat as idle
> return false;
> }
> {code}
> This is consistent with the semantics: if there are no partial watermarks to
> track, the combined status should be idle.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)