[ https://issues.apache.org/jira/browse/FLINK-35886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski closed FLINK-35886. ---------------------------------- Fix Version/s: 2.0.0 1.19.2 1.20.1 Release Note: For detecting idleness, the way how idleness timeout is calculated has changed. Previously the time, when source or source's split has been backpressured or blocked due to watermark alignment, was accounted towards the idleness timeout. This could lead to a situation where sources or some splits were incorrectly switching to idle, while they were being unable to make any progress and had some more records to emit, which in turn could result in incorrectly calculated watermarks and erroneous late data. This has been fixed for 1.19.2, 1.20.1 and 2.0.0. This change required some API changes, like introduction of `org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier.Context#getInputActivityClock`. However this shouldn't create compatibility problems for users upgrading from prior Flink versions. Resolution: Fixed Merged to 1.20 as [d1fe68e3358..62750753ad4] Merged to 1.19 as [f10d5e3be03..5da82f8ca4e] > Incorrect watermark idleness timeout accounting when subtask is > backpressured/blocked > ------------------------------------------------------------------------------------- > > Key: FLINK-35886 > URL: https://issues.apache.org/jira/browse/FLINK-35886 > Project: Flink > Issue Type: Bug > Components: API / DataStream, Runtime / Task > Affects Versions: 1.18.1, 1.20.0, 1.19.1 > Reporter: Piotr Nowojski > Assignee: Piotr Nowojski > Priority: Critical > Labels: pull-request-available > Fix For: 2.0.0, 1.19.2, 1.20.1 > > > Currently when using watermark with idleness in Flink, idleness can be > incorrectly detected when reading records from a source that is blocked by > the runtime. For example this can easily happen when source is either > backpressured, or blocked by the watermark alignment. In those cases, despite > there are more records to be read from the source (or source’s split), > runtime is deciding not to poll (or being unable to) those records. In such > case idleness timeout can kick in, marking source/source split as idle, which > can lead to incorrect combined watermark calculations and dropping of > incorrectly marked late records. > h4. Watermark alignment > If there are two source splits, A and B , and maxAllowedWatermarkDrift is set > to 30s. > # Partition A emitted watermark 1042 sec, while partition B sits at watermark > 1000 sec. > # {{1042s - 1000s > maxAllowedWatermarkDrift}}, so partition A is blocked by > the watermark alignment. > # For the duration of idleTimeout, partition B is emitting some large batch > of records, that do not advance watermark of that partition by much. For > example either watermark for partition B stays 1000s, or is updated by a > small amount to for example 1005s. > # idleTimeout kicks in, marking partition A as idle > # partition B finishes emitting large batch of those older records, and let's > say now there is a gap in rowtimes. Previously partition B was emitting > records with rowtime ~1000s, now it jumps to for example ~5000s. > # As partition A is idle, combined watermark jumps to ~5000s as well. > # Watermark alignment unblocks partition A, and it continues emitting records > with rowtime ~1042s. But now all of those records are dropped due to being > late. > h4. Backpressure > When there are two SourceOperator’s, A and B. Due to for example some data > skew, it could happen that either only A gets backpressured, or A is > backpressured quicker/sooner. Either way, during that time when A is > backpressured, while B is not, B can bump the combined watermark high enough, > so that when backpressure recedes, fresh records from A will be considered as > late, leading to incorrect results. -- This message was sent by Atlassian Jira (v8.20.10#820010)