[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818283#comment-17818283 ]
Feifan Wang commented on FLINK-33734: ------------------------------------- [~roman] : {quote}Do I understand correctly, that the improvement now comes mostly from moving `offsets` serialization from JM to TM? {quote} Yes, 68% of the time reduction comes from offloading offset serialization to TM. {quote}I'm wondering whether it would also make sense to move storing the handle from JM to TM and only return a pointer to it. {quote} I think it also make sense, but this method will make rescale more difficult. {quote}Do you have the numbers for RPC times? {quote} I haven't tracked the processing time of rpc, so I don't have specific figures. The previous judgment that RPC took a long time came from the flame graph. [^flamegraph.control-group.html] [^flamegraph.only-merge-handle.html] [^flamegraph.merge-handle-and-serialize-on-tm.html] {quote}As for ByteStreamStateHandle, we may need to change channel granular splitting to subtask granular splitting. {quote} What I originally wanted to say here is that with the merged channel state handle, we don't need to extract bytes for each channel. This may not be critical at this point, so we can leave it alone. [~Zakelly] : {quote}Overall it makes sense to offload file creation from JM to TM {quote} In fact, this PR does not offload the file creation (metadata file) to TM, but only reorganizes the handle and offloads the serialization of the offset in the handle to TM. The test that produced the above results was conducted based on FLINK-1.16.1. I constructed a job with 2000 parallelism and actively limited the throughput of the sink operator to simulate back pressure. The following figure is the topology diagram of the test job. !https://km.sankuai.com/api/file/cdn/2059636095/80577277714?contentType=1&isNewContent=false|width=670,height=104! {quote}I'm curious about why the checkpoint time reduced by 36s (from 66s to 30s) when serializing in TM side while the metadata time is only 12s. {quote} Time saved of metadata serialization is only 12s, but it can save more time of the RPC processing. {quote}And how the long-tail of async duration happened, is it due to massive file creation requests and hotspot issue in NN node of HDFS? {quote} The test job running on a public HDFS, many other jobs running on it, there are many uncontrollable factors, and I did not investigate them carefully. > Merge unaligned checkpoint state handle > --------------------------------------- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing > Reporter: Feifan Wang > Assignee: Feifan Wang > Priority: Major > Labels: pull-request-available > Attachments: flamegraph.control-group.html, > flamegraph.merge-handle-and-serialize-on-tm.html, > flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png > > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)