[ https://issues.apache.org/jira/browse/FLINK-34252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812081#comment-17812081 ]
David Christle commented on FLINK-34252: ---------------------------------------- [~martijnvisser] The swapping of watermark between active/idle/active/idle appears to happen when the data stream is _not_ idle. In the static vs temporary no data vs dynamic assignment cases, those are all cases where a stream should actually be signaled as "idle" in some way. But here, a stream that is active is erroneously marked idle. The `WatermarkAssignerOperator` isn't a Source, so it does not know which of the three cases is causing idleness. To detect idleness, the Operator takes `idleTimeout` as an argument, and compares it against the processing timestamp of the last record it received. The way it appears the Operator _should_ work is that if no record is incoming for longer than `idleTimeout`, it infers the stream is idle, and emits `WatermarkStatus.IDLE`. This makes sense: whether the reason for the idleness is a static assignment that made it so no records are received, a stream doesn't produce records for a while, or whether a split happens to not be assigned for a while due to dynamic assignment/not enough splits, all of these cases translate to the Operator observing no more records for too long of a time. When it hasn't seen any records for longer than `idleTimeout`, it emits `WatermarkStatus.IDLE`. I believe this signals to all downstream operators that this sub-stream is idle & the should not wait for anymore watermarks from it. The problem is that the code doesn't work like this. When data are arriving faster than `idleTimeout`, the tracking of `lastRecordTime` is broken. This triggers the idleness detection logic when it shouldn't - the stream is not idle, and downstream operators _should_ wait for watermarks - but the Operator is flipping back and forth between emitting WatermarkStatus.IDLE/WatermarkStatus.ACTIVE. If downstream operators receive an IDLE status from this Operator when they shouldn't, if I understand correctly, they will advance their watermarks too early (IDLE signals they should ignore this sub-stream in their watermark update logic). This breaks the guarantees around watermarks/event time & could cause incorrect results. Here is a PR I submitted: https://github.com/apache/flink/pull/24211 - that might make the issue/fix clearer. > WatermarkAssignerOperator should not emit WatermarkStatus.IDLE under > continuous data flow > ----------------------------------------------------------------------------------------- > > Key: FLINK-34252 > URL: https://issues.apache.org/jira/browse/FLINK-34252 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.16.3, 1.17.2, 1.18.1 > Reporter: David Christle > Priority: Major > Labels: pull-request-available > > The WatermarkAssignerOperator in the table runtime incorrectly transitions to > an IDLE state even when data is continuously flowing. This behavior, observed > under normal operating conditions where the interval between data elements is > shorter than the configured idleTimeout, leads to regular transitions between > ACTIVE and IDLE states, which are unnecessary. > _Detail:_ > In the current implementation, the lastRecordTime variable, which tracks the > time of the last received data element, is updated only when the > WatermarkStatus transitions from IDLE to ACTIVE. However, it is not updated > when WatermarkStatus is ACTIVE, which means even under continuous data flow, > the condition `(currentTime - lastRecordTime > idleTimeout)` will eventually > always become true, and the WatermarkStatus will erroneously be marked IDLE. > It is unclear to me if this bug produces any incorrectness downstream, since > when the WatermarkStatus is in in the IDLE state, the next processElement > will cause a WatermarkStatus.ACTIVE to be emitted. Nevertheless, we should > eliminate this flip-flop behavior between states. > The test I wrote fails without the fix and illustrates the flip-flops: > {noformat} > [ERROR] Tests run: 4, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.030 > s <<< FAILURE! -- in > org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest > [ERROR] > org.apache.flink.table.runtime.operators.wmassigners.WatermarkAssignerOperatorTest.testIdleStateAvoidanceWithConsistentDataFlow > -- Time elapsed: 0.013 s <<< FAILURE! > java.lang.AssertionError: > Expecting > [WatermarkStatus(IDLE), > WatermarkStatus(ACTIVE), > WatermarkStatus(IDLE), > WatermarkStatus(ACTIVE), > WatermarkStatus(IDLE), > WatermarkStatus(ACTIVE), > WatermarkStatus(IDLE), > WatermarkStatus(ACTIVE), > WatermarkStatus(IDLE)] > not to contain > [WatermarkStatus(IDLE)] > but found > [WatermarkStatus(IDLE)] > {noformat} -- This message was sent by Atlassian Jira (v8.20.10#820010)