[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Feifan Wang updated FLINK-33734: -------------------------------- Attachment: flamegraph.control-group.html flamegraph.merge-handle-and-serialize-on-tm.html flamegraph.only-merge-handle.html > Merge unaligned checkpoint state handle > --------------------------------------- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Reporter: Feifan Wang > Assignee: Feifan Wang > Priority: Major > Labels: pull-request-available > Attachments: flamegraph.control-group.html, > flamegraph.merge-handle-and-serialize-on-tm.html, > flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png > > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)