[ 
https://issues.apache.org/jira/browse/FLINK-24501?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17432190#comment-17432190
 ] 

Wenlong Lyu commented on FLINK-24501:
-------------------------------------

I think it may be better to make sure that watermark would not reduce after 
restoring from a checkpoint/savepoint instead of modifying the manner of 
operator to cover such abnormal case. 
For example, add an operator state in watermark assigner, to avoid it producing 
wrong watermark after restore?

> Unexpected behavior of cumulate window aggregate for late event after recover 
> from sp/cp
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-24501
>                 URL: https://issues.apache.org/jira/browse/FLINK-24501
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: JING ZHANG
>            Assignee: JING ZHANG
>            Priority: Major
>              Labels: pull-request-available
>
> *Problem description*
> After recover from savepoint or checkpoint, unexpected behavior of cumulate 
> window aggregate for late event may happened.
> *Bug analyze*
> Currently, for cumulate window aggregate, late events belongs to the cleaned 
> slice would be merged into the merged window state, and would be counted into 
> the later slice.
> For example, for a CUMULATE window, step is 1 minute, size is 1 day.
> {code:java}
> SELECT window_start, window_end, COUNT(USER_ID)
>   FROM TABLE(
>     CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL 
> '1' DAY))
>   GROUP BY window_start, window_end;{code}
> When the watermark already comes to 11:01, result of window [00:00, 11:01) 
> would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)
> Then if a late record which event time is 11:00 comes, it would be merged 
> into merged state, and would be counted into the later slice, for example, 
> for window [00:00, 11:02), [00:00, 11:03)... But the emitted window result 
> INSERT (00:00, 11:01, 4) would not be retracted and updated.
> The behavior would be different if the job recover from savepoint/checkpoint.
> Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01, 
> 4).
> Then recover the job from savepoint. Watermarks are not checkpointed and they 
> need to be repopulated again. So after recovered, the watermark may rollback 
> to 11:00, then if a record which event time is 11:00 comes, it would not be 
> processed as late event, after watermark comes to 11:01 again, a window 
> result INSERT (00:00, 11:01, 5)  would be emitted to downstream.
> So the downstream operator would receive two INSERT record for WINDOW (00:00, 
> 11:01) which may leads to wrong result.
>  
> *Solution*
> There are two solutions for the problem:
>  # save watermark to state in slice shared operator. (Prefered)
>  # update the behavior for late event. For example, retract the emitted 
> result and send the updated result. It needs to change the behavior of slice 
> state clean mechanism because we clean the slice state after watermark 
> exceeds the slice end currently.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to