[ 
https://issues.apache.org/jira/browse/FLINK-26062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490020#comment-17490020
 ] 

Roman Khachatryan commented on FLINK-26062:
-------------------------------------------

Good point, yes, peek() is also affected.
However:
1 When processing a timer, it is polled after peeking it. That poll() will be 
recorded and the result will be the same after recovery.
2 When peeking a timer and not removing it, only its timestamp is checked, 
which is the same for all timers at the head of the queue
3 PriorityQueue state is not exposed to the user, so no there can be no 
side-effects generated by a user program

Any side-effects resulting from processing a timer, such as writing to an 
external system, are subject to the existing constraints (i.e. any side-effects 
should only be committed after the checkpoint completion notification).

So I think this shouldn't be an issue.

> [Changelog] Non-deterministic recovery of PriorityQueue states
> --------------------------------------------------------------
>
>                 Key: FLINK-26062
>                 URL: https://issues.apache.org/jira/browse/FLINK-26062
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / State Backends
>    Affects Versions: 1.15.0
>            Reporter: Roman Khachatryan
>            Assignee: Roman Khachatryan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.0
>
>
> Currently, InternalPriorityQueue.poll() is logged as a separate operation, 
> without specifying the element that has been polled. On recovery, this 
> recorded poll() is replayed.
> However, this is not deterministic because the order of PQ elements with 
> equal priorityis not specified. For example, TimerHeapInternalTimer only 
> compares timestamps, which are often equal. This results in polling timers 
> from queue in wrong order => dropping timers => and not firing timers.
>  
> ProcessingTimeWindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow
>  fails with materialization enabled and using heap state backend (both 
> in-memory and fs-based implementations).
>  
> Proposed solution is to replace poll with remove operation (which is based on 
> equality).
>  
> cc: [~masteryhx], [~ym], [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to