Roman Khachatryan created FLINK-26019:
-----------------------------------------

             Summary: Changelogged PriorityQueue elements recovered out-of-order
                 Key: FLINK-26019
                 URL: https://issues.apache.org/jira/browse/FLINK-26019
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 1.15.0
            Reporter: Roman Khachatryan
            Assignee: Roman Khachatryan
             Fix For: 1.15.0


StateChangeFormat is the class responsible for writing out changelog data.
Each chunk of data is sorted by: logId -> sequenceNumber -> keyGroup.
Sorting by sequenceNumber preserves temporal order.
Sorting by keyGroup a) puts metadata (group -1) at the beginning and b) allows 
to write KG only once.

However, the assumption that the order of changes across groups currently 
doesn't hold: poll operation of InternalPriorityQueue may affect any group (the 
smaller item across groups so far will be polled).

This results in wrong processing time timers being removed on recovery in 
ProcessingTimeWindowCheckpointingITCase#testAggregatingSlidingProcessingTimeWindow

One way to solve this probelm is to simply disable KG-sorting and grouping 
(only output metadata at the beginning). 
The other one is to associate polled element with the correct key group while 
logging changes.

Both ways should work with re-scaling.

cc: [~masteryhx], [~ym], [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to