[ 
https://issues.apache.org/jira/browse/FLINK-38476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Weiqing Yang closed FLINK-38476.
--------------------------------
    Resolution: Duplicate

> Add FINISHED watermark status to support proper watermark aggregation
> ---------------------------------------------------------------------
>
>                 Key: FLINK-38476
>                 URL: https://issues.apache.org/jira/browse/FLINK-38476
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Network
>    Affects Versions: 1.16.3, 2.1.0, 1.20.3
>            Reporter: Weiqing Yang
>            Priority: Major
>
> *Environment / Preconditions*
>  * Source: Kafka connector with dynamic partition discovery disabled (no 
> split rescan after startup).
>  * Input topic(s): total partition count = {*}N{*}.
>  * Job parallelism = {*}P{*}, where *P > N* (some source subtasks start with 
> no assigned partitions).
>  * Idle detection configured (e.g., {{{}table.exec.source.idle-timeout = 
> 10s{}}}).
>  * Downstream uses event-time semantics (e.g., {{{}IntervalJoin{}}}, 
> time-bounded aggregations, windows).{*}{*}
> *Problem Summary*
>  * When *P > N* and Kafka partition discovery is disabled, some source 
> subtasks start with no splits and {*}finish immediately{*}. Finished subtasks 
> emit a watermark of {{Long.MAX_VALUE}} but are *not excluded* from watermark 
> aggregation. If remaining active subtasks later go *IDLE* (e.g., during 
> lulls/backpressure and idle-timeout expiry), the only “non-idle” watermark 
> seen downstream becomes {{{}Long.MAX_VALUE{}}}, which advances operator 
> watermarks to infinity. That causes {*}all subsequent records to be treated 
> as late{*}, timers to clean up state, and effectively *no output* from 
> time-aware operators.
> *Detailed Behavior*
>  * Event-time progress for an operator is the *minimum* watermark across its 
> *non-idle* inputs.
>  * Finished subtasks today:
>  ** set watermark to {{{}Long.MAX_VALUE{}}},
>  ** but are *not* marked idle / excluded from aggregation.
>  * Failure condition:
>  ## Some subtasks finish at startup (no splits).
>  ## Later, active subtasks go *IDLE* due to 
> {{{}table.exec.source.idle-timeout{}}}.
>  ## Aggregation sees only the finished subtasks as “non-idle” → min watermark 
> = {{{}Long.MAX_VALUE{}}}.
>  ## Downstream (e.g., {{{}IntervalJoin{}}}) advances its operator watermark 
> to {{{}Long.MAX_VALUE{}}}.
>  ## All incoming records are “late,” time-bounded join/aggregate can’t match, 
> cleanup timers fire → {*}zero output{*}.
> *Why it reproduces with P > N*
>  * With {*}P > N{*}, at least *P − N* source subtasks receive no splits and 
> finish immediately.
>  * With {*}P == N{*}, each subtask has a split, so the “finished watermark 
> dominates” condition does not arise at startup.
> *Steps to Reproduce (minimal)*
>  # Create a Kafka topic with *N* partitions. Disable connector’s dynamic 
> partition discovery.
>  # Launch a Flink job with *P > N* source parallelism (e.g., a simple 
> pipeline: KafkaSource → {{IntervalJoin}} or time-windowed op → sink).
>  # Configure idle detection (e.g., {{{}table.exec.source.idle-timeout = 
> 10s{}}}).
>  # Start with some traffic, then pause/slow it enough that active subtasks 
> trip idle timeout.
>  # Observe downstream operator watermark jump to {{{}Long.MAX_VALUE{}}}, 
> records subsequently dropped as late, no output emitted.
> *Expected Result*
> Finished inputs should be *excluded* from aggregated watermark progression 
> (behave like “non-contributing” channels) until *all* inputs are finished.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to