JING ZHANG created FLINK-24501:
----------------------------------

             Summary: Unexpected behavior of cumulate window aggregate for late 
event after recover from sp/cp
                 Key: FLINK-24501
                 URL: https://issues.apache.org/jira/browse/FLINK-24501
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
            Reporter: JING ZHANG


*Problem description*

After recover from savepoint or checkpoint, unexpected behavior of cumulate 
window aggregate for late event may happened.

*Bug analyze*

Currently, for cumulate window aggregate, late events belongs to the cleaned 
slice would be merged into the merged window state, and would be counted into 
the later slice.

For example, for a CUMULATE window, step is 1 minute, size is 1 day.
{code:java}
SELECT window_start, window_end, COUNT(USER_ID)
  FROM TABLE(
    CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '1' MINUTES, INTERVAL '1' 
DAY))
  GROUP BY window_start, window_end;{code}
When the watermark already comes to 11:01, result of window [00:00, 11:01) 
would be emitted. Let's assume the result is INSERT (00:00, 11:01, 4)

Then if a late record which event time is 11:00 comes, it would be merged into 
merged state, and would be counted into the later slice, for example, for 
window [00:00, 11:02), [00:00, 11:03)... But the emitted window result INSERT 
(00:00, 11:01, 4) would not be retracted and updated.

The behavior would be different if the job recover from savepoint/checkpoint. 
Let's do a savepoint after watermark comes to 11:01 and emit (00:00, 11:01, 4).

Then recover the job from savepoint. Watermarks are not checkpointed and they 
need to be repopulated again. So after recovered, the watermark may rollback to 
11:00, then if a record which event time is 11:00 comes, it would not be 
processed as late event, after watermark comes to 11:01 again, a window result 
INSERT (00:00, 11:01, 5)  would be emitted to downstream.

So the downstream operator would receive two INSERT record for WINDOW (00:00, 
11:01) which may leads to wrong result.

 

*Solution*

There are two solutions for the problem:
 # save watermark to state in slice shared operator. (Prefered)
 # update the behavior for late event. For example, retract the emitted result 
and send the updated result. It needs to change the behavior of slice state 
clean mechanism because we clean the slice state after watermark exceeds the 
slice end currently.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to