[ https://issues.apache.org/jira/browse/FLINK-26062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17490020#comment-17490020 ]
Roman Khachatryan commented on FLINK-26062: ------------------------------------------- Good point, yes, peek() is also affected. However: 1 When processing a timer, it is polled after peeking it. That poll() will be recorded and the result will be the same after recovery. 2 When peeking a timer and not removing it, only its timestamp is checked, which is the same for all timers at the head of the queue 3 PriorityQueue state is not exposed to the user, so no there can be no side-effects generated by a user program Any side-effects resulting from processing a timer, such as writing to an external system, are subject to the existing constraints (i.e. any side-effects should only be committed after the checkpoint completion notification). So I think this shouldn't be an issue. > [Changelog] Non-deterministic recovery of PriorityQueue states > -------------------------------------------------------------- > > Key: FLINK-26062 > URL: https://issues.apache.org/jira/browse/FLINK-26062 > Project: Flink > Issue Type: Bug > Components: Runtime / State Backends > Affects Versions: 1.15.0 > Reporter: Roman Khachatryan > Assignee: Roman Khachatryan > Priority: Major > Labels: pull-request-available > Fix For: 1.15.0 > > > Currently, InternalPriorityQueue.poll() is logged as a separate operation, > without specifying the element that has been polled. On recovery, this > recorded poll() is replayed. > However, this is not deterministic because the order of PQ elements with > equal priorityis not specified. For example, TimerHeapInternalTimer only > compares timestamps, which are often equal. This results in polling timers > from queue in wrong order => dropping timers => and not firing timers. > > ProcessingTimeWindowCheckpointingITCase.testAggregatingSlidingProcessingTimeWindow > fails with materialization enabled and using heap state backend (both > in-memory and fs-based implementations). > > Proposed solution is to replace poll with remove operation (which is based on > equality). > > cc: [~masteryhx], [~ym], [~yunta] -- This message was sent by Atlassian Jira (v8.20.1#820001)