[ https://issues.apache.org/jira/browse/FLINK-24501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432190#comment-17432190 ]
Wenlong Lyu commented on FLINK-24501: ------------------------------------- I think it may be better to make sure that watermark would not reduce after restoring from a checkpoint/savepoint instead of modifying the manner of operator to cover such abnormal case. For example, add an operator state in watermark assigner, to avoid it producing wrong watermark after restore? > Unexpected behavior of cumulate window aggregate for late event after recover > from sp/cp > ---------------------------------------------------------------------------------------- > > Key: FLINK-24501 > URL: https://issues.apache.org/jira/browse/FLINK-24501 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Reporter: JING ZHANG > Assignee: JING ZHANG > Priority: Major > Labels: pull-request-available > > *Problem description* > After recover from savepoint or checkpoint, unexpected behavior of cumulate > window aggregate for late event may happened. > *Bug analyze* > Currently, for cumulate window aggregate, late events belongs to the cleaned > slice would be merged into the merged window state, and would be counted into > the later slice. > For example, for a CUMULATE window, step is 1 minute, size is 1 day. > {code:java} > SELECT window_start, window_end, COUNT(USER_ID) > FROM TABLE( > CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL > '1' DAY)) > GROUP BY window_start, window_end;{code} > When the watermark already comes to 11:01, result of window [00:00, 11:01) > would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4) > Then if a late record which event time is 11:00 comes, it would be merged > into merged state, and would be counted into the later slice, for example, > for window [00:00, 11:02), [00:00, 11:03)... But the emitted window result > INSERT (00:00, 11:01, 4) would not be retracted and updated. > The behavior would be different if the job recover from savepoint/checkpoint. > Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01, > 4). > Then recover the job from savepoint. Watermarks are not checkpointed and they > need to be repopulated again. So after recovered, the watermark may rollback > to 11:00, then if a record which event time is 11:00 comes, it would not be > processed as late event, after watermark comes to 11:01 again, a window > result INSERT (00:00, 11:01, 5) would be emitted to downstream. > So the downstream operator would receive two INSERT record for WINDOW (00:00, > 11:01) which may leads to wrong result. > > *Solution* > There are two solutions for the problem: > # save watermark to state in slice shared operator. (Prefered) > # update the behavior for late event. For example, retract the emitted > result and send the updated result. It needs to change the behavior of slice > state clean mechanism because we clean the slice state after watermark > exceeds the slice end currently. > -- This message was sent by Atlassian Jira (v8.3.4#803005)