[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17834603#comment-17834603 ] Zakelly Lan commented on FLINK-33734: - I'd +1 for this optimization. > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Attachments: flamegraph.control-group.html, > flamegraph.merge-handle-and-serialize-on-tm.html, > flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png > > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17830472#comment-17830472 ] Feifan Wang commented on FLINK-33734: - Kindly ping [~Zakelly] , [~roman] . > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Attachments: flamegraph.control-group.html, > flamegraph.merge-handle-and-serialize-on-tm.html, > flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png > > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818283#comment-17818283 ] Feifan Wang commented on FLINK-33734: - [~roman] : {quote}Do I understand correctly, that the improvement now comes mostly from moving `offsets` serialization from JM to TM? {quote} Yes, 68% of the time reduction comes from offloading offset serialization to TM. {quote}I'm wondering whether it would also make sense to move storing the handle from JM to TM and only return a pointer to it. {quote} I think it also make sense, but this method will make rescale more difficult. {quote}Do you have the numbers for RPC times? {quote} I haven't tracked the processing time of rpc, so I don't have specific figures. The previous judgment that RPC took a long time came from the flame graph. [^flamegraph.control-group.html] [^flamegraph.only-merge-handle.html] [^flamegraph.merge-handle-and-serialize-on-tm.html] {quote}As for ByteStreamStateHandle, we may need to change channel granular splitting to subtask granular splitting. {quote} What I originally wanted to say here is that with the merged channel state handle, we don't need to extract bytes for each channel. This may not be critical at this point, so we can leave it alone. [~Zakelly] : {quote}Overall it makes sense to offload file creation from JM to TM {quote} In fact, this PR does not offload the file creation (metadata file) to TM, but only reorganizes the handle and offloads the serialization of the offset in the handle to TM. The test that produced the above results was conducted based on FLINK-1.16.1. I constructed a job with 2000 parallelism and actively limited the throughput of the sink operator to simulate back pressure. The following figure is the topology diagram of the test job. !https://km.sankuai.com/api/file/cdn/2059636095/80577277714?contentType=1&isNewContent=false|width=670,height=104! {quote}I'm curious about why the checkpoint time reduced by 36s (from 66s to 30s) when serializing in TM side while the metadata time is only 12s. {quote} Time saved of metadata serialization is only 12s, but it can save more time of the RPC processing. {quote}And how the long-tail of async duration happened, is it due to massive file creation requests and hotspot issue in NN node of HDFS? {quote} The test job running on a public HDFS, many other jobs running on it, there are many uncontrollable factors, and I did not investigate them carefully. > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Attachments: flamegraph.control-group.html, > flamegraph.merge-handle-and-serialize-on-tm.html, > flamegraph.only-merge-handle.html, image-2024-02-18-15-12-20-665.png > > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHand
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818255#comment-17818255 ] Zakelly Lan commented on FLINK-33734: - [~Feifan Wang] Sorry for the late reply as I was on vacation for a few weeks. Overall it makes sense to offload file creation from JM to TM, especially in conjunction with the file merging mechanism in TM side introduced by FLIP-306. I thought it is better to show some detailed analysis about the performance enhancement and some explanation about the implementation as well as the test setup. I'm curious about why the checkpoint time reduced by 36s (from 66s to 30s) when serializing in TM side while the metadata time is only 12s. And how the long-tail of async duration happened, is it due to massive file creation requests and hotspot issue in NN node of HDFS? > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Attachments: image-2024-02-18-15-12-20-665.png > > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818251#comment-17818251 ] Roman Khachatryan commented on FLINK-33734: --- As for {quote}IIUC, ByteStreamStateHandle in each InputChannelStateHandle/ResultSubpartitionStateHandle is exclusive and uses a random UUID as the handle name. I just looked at this code and saw that FLINK-17972 was created while [~roman] were writing this code. I think the MergedInputChannelStateHandle mentioned above is an implementation of FLINK-17972. {quote} :D Yes, that's my understanding as well. {quote}As for ByteStreamStateHandle, we may need to change channel granular splitting to subtask granular splitting. WDYT [~roman] ?{quote} I'm not sure I understand. ByteStreamStateHandle is already created for subtask, and the bytes are extracted from it for every channel. > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Attachments: image-2024-02-18-15-12-20-665.png > > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818249#comment-17818249 ] Roman Khachatryan commented on FLINK-33734: --- Thanks for the PR [~Feifan Wang] Do I understand correctly, that the improvement now comes mostly from moving `offsets` serialization from JM to TM? I'm wondering whether it would also make sense to move storing the handle from JM to TM and only return a pointer to it. Do you have the numbers for RPC times? > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Attachments: image-2024-02-18-15-12-20-665.png > > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818246#comment-17818246 ] Feifan Wang commented on FLINK-33734: - Hi [~fanrui] , I rerun the test job , and find some long "Async Duration". The checkpoint storage is HDFS, I think the bottleneck now should be long-tail latency. !image-2024-02-18-15-12-20-665.png|width=860,height=200! > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > Attachments: image-2024-02-18-15-12-20-665.png > > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17818227#comment-17818227 ] Feifan Wang commented on FLINK-33734: - Sorry to keep you waiting [~fanrui] . Checkpoint production consists of multiple stages, this PR only solves the problem of metadata expansion. There are other stages that may take too long, such as uploading files to HDFS. As for the bottleneck after this PR, I didn't take the time to investigate and if necessary, I would take a look. > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813874#comment-17813874 ] Rui Fan commented on FLINK-33734: - Thanks [~Feifan Wang] for the effort. IIUC, the checkpoint avg time is reduced from 82 s to 30s, right? IMHO, I think the checkpoint time is still too long. WDYT? > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813646#comment-17813646 ] Feifan Wang commented on FLINK-33734: - I found that only merging handles was not enough. Although the metadata was obviously smaller, it still took a long time for jobmanager to process rpc and serialize metadata. I speculate the reason is that although the merge handle avoids duplicate file paths, but the number of objects in the metadata is not significantly reduced (mainly channel infos). So I tried serializing the channel infos directly on the taskmanager side, and the test results showed that this worked well. Below are the results of my test: {code:java} # control group : checkpoint time ( s ) --- avg: 82.73 max: 155.88 min: 49.90 store metadata time ( s ) --- avg: 23.45 max: 47.60 min: 9.61 metadata size ( MB ) --- avg: 696.55 max: 954.98 min: 461.35 store metadata speed ( MB/s ) --- avg: 32.41 max: 61.82 min: 18.29 # only merge handle : checkpoint time ( s ) --- avg: 66.22 max: 123.12 min: 38.68 store metadata time ( s ) --- avg: 12.76 max: 26.18 min: 4.02 metadata size ( MB ) --- avg: 269.14 max: 394.26 min: 159.86 store metadata speed ( MB/s ) --- avg: 23.93 max: 46.55 min: 11.33 # not only merge handles, but also serialize channel infos on TaskMangager : checkpoint time ( s ) --- avg: 30.63 max: 74.27 min: 5.16 store metadata time ( s ) --- avg: 0.87 max: 11.23 min: 0.12 metadata size ( MB ) --- avg: 232.22 max: 392.86 min: 45.34 store metadata speed ( MB/s ) --- avg: 291.00 max: 386.80 min: 23.18{code} Based on the results of the above test, I think serializing channel infos on the taskmanger side should be done together. I submitted a PR to implement this solution, please have a look [~pnowojski] ,[~fanrui] ,[~roman] , [~Zakelly] . > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > Labels: pull-request-available > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStat
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17796391#comment-17796391 ] Piotr Nowojski commented on FLINK-33734: {quote} Yes, since multiple subtasks will reuse unaligned checkpoint files after the ISSUE is completed, merging handles between multiple subtasks can further reduce redundant data. But this may require changing the way the checkpoint metadata objects are organized. And this optimization is constant level, but merging handles within subtask can reduce the number of file paths from n^2 to n. So I'm not sure if merging handles between subtasks is worth it at this stage. {quote} +1 to merging within subtask first. I doubt that we will need to merge state handles across subtasks, as there is already tons of communication between JM and each subtask, but we can evaluate it later if it proves to be still an issue. > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17795222#comment-17795222 ] Feifan Wang commented on FLINK-33734: - Thanks [~roman] , merge handle in JM in MetadataSerializer can only reduce the size of metadata file, and dfs capacity tends not to be an issue. In the Production example I mentioned above, the main problem is that checkpointing takes too long. More specifically, we observed that it took more than 40 seconds for the JM to process the checkpoint ack rpc and more than 20 seconds to serialize the metadata object. So I still think that handles should be merged in TM. {quote}1. Does it make sense to also merge state from multiple subtasks (as implemented in FLINK-26803)? {quote} Yes, since multiple subtasks will reuse unaligned checkpoint files after the ISSUE is completed, merging handles between multiple subtasks can further reduce redundant data. But this may require changing the way the checkpoint metadata objects are organized. And this optimization is constant level, but merging handles within subtask can reduce the number of file paths from n^2 to n. So I'm not sure if merging handles between subtasks is worth it at this stage. {quote}2. What happens when the delegate is in-memory state handle (`ByteStreamStateHandle`)? {quote} IIUC, ByteStreamStateHandle in each InputChannelStateHandle/ResultSubpartitionStateHandle is exclusive and uses a random UUID as the handle name. I just looked at this code and saw that FLINK-17972 was created while [~roman] were writing this code. I think the MergedInputChannelStateHandle mentioned above is an implementation of FLINK-17972. As for ByteStreamStateHandle, we may need to change channel granular splitting to subtask granular splitting. WDYT [~roman] ? > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794682#comment-17794682 ] Roman Khachatryan commented on FLINK-33734: --- Thanks for the proposal! You're right [~Feifan Wang], we do need to add a new Metadata Serializer version. Actually, I think it should be possible to solve the original problem (big _metadata file) by only changing how these state handles are (de)serialized. I.e. merge them in JM in MetadataSerializer. That would be less efficient of course (as it wouldn't reduce RPC message size), but the code might be simpler and the change easier to implement. To be clear, I personally don't have any preference. As for recovery, it's not only about performance, but also RPC message size: with checkpointing improved (but not recovery), RPC message size might be significantly larger on recovery than on checkpoint. We should probably document this (and add a feature toggle), until recovery is also optimized. I have a couple of more questions: 1. Does it make sense to also merge state from multiple subtasks (as implemented in [FLINK-26803|https://issues.apache.org/jira/browse/FLINK-26803])? 2. What happens when the delegate is in-memory state handle (`ByteStreamStateHandle`)? > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794146#comment-17794146 ] Feifan Wang commented on FLINK-33734: - We do also want to improve the speed of recovery, but currently checkpointing time is indeed the most important, so I completely agree with completing it in two steps. When checking the source code, I found that the types of inputChannelState and resultSubpartitionState in OperatorSubtaskState are concrete classes instead of interfaces. And the serialization of these two handles does not use the handle type flag like other handles. This means that in order to introduce MergedInputChannelStateHandle , we may need to introduce MetadataV5Serializer . What do you think? Or do you have any other suggestions? [~pnowojski] > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17794090#comment-17794090 ] Piotr Nowojski commented on FLINK-33734: Recovery times are very often important. But yes, obviously checkpointing time is more crucial :) If you would be able to improve both that would be great! Of course this could be done in two steps. > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793998#comment-17793998 ] Feifan Wang commented on FLINK-33734: - Thanks to [~Zakelly] , [~pnowojski] and [~fanrui] for participating in the discussion. to [~Zakelly] : Yes, this proposal only aims to reduce the meta size of unaligned checkpoint. I also think that FLIP-306 does not solve the above problems. At the same time, I think my above proposal can work with FLIP-306. to [~pnowojski] : {quote}That sending out the RPCs during recovery will take a long time? {quote} Yes,in theory sending these rpc during recovery also takes a long time, but we have not paid attention to it before. First, because our job can accept a recovery time of several minutes from a business perspective. The second is that this kind of checkpoint only occurs during backpressure, and we have not tried to use this kind of checkpoint to restore the job. {quote}Wouldn't it be better to keep the state handles merged during recovery until they reach their destined subtasks on TMs? {quote} I hold the same view with [~fanrui] on this issue. It is acceptable to me to solve the problems during checkpoint creation and recovery in two steps. to [~fanrui] : {quote}Can we think the _metadata file size will be reduced 68% after this proposal? {quote} Yes, but only for checkpoints where unaligned checkpoint handles account for the vast majority as mentioned above. {quote}How does flink serialize the MergedInputChannelStateHandle? Does it store the field name? {quote} The current serialization method of metadata objects is compact, and field names are not saved in the file. The serialization of each handle is hardcoded. > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,30
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793770#comment-17793770 ] Rui Fan commented on FLINK-33734: - Thanks [~Feifan Wang] creating this Jira, and thanks [~pnowojski] for the feedback! {quote}Doesn't this mean we will have a very similar problem during recovery? {quote} Yes, you are right. I have a offline discuss with [~Feifan Wang] in advance. If jobmangager converts MergedInputChannelStateHandle to InputChannelStateHandle collection before assigning state handle, the whole recovery logic won't be change, so this Jira is easy to implement. Why [~Feifan Wang] and I think the checkpoint duration deserve more attention than recovery duration? Because the checkpoint is very frequent than recovery. Of course, if we want to improve the recovery logic, it's fine for me. As I understand, the MergedInputChannelStateHandle can be used directly when the parallelism isn't changed. When the rescale happen, channel info should be stored with filePath together during recovery. {quote} jobmangager converts MergedInputChannelStateHandle to InputChannelStateHandle collection before assigning state handle {quote} Or we can consider it as the stage one? And improve the recovery duration as the stage two? Look forward to more feedback from you, thanks~ -- Also, I have some questions about this proposal: {quote}Of the 950MB in the metadata file, 68% are redundant file paths. {quote} Can we think the _metadata file size will be reduced 68% after this proposal? {quote}Structure of MergedInputChannelStateHandle : {quote} How does flink serialize the MergedInputChannelStateHandle? Does it store the field name? If we change the inputChannelIdx to idx, can it reduce the file size? IIUC, the idx is very frequent in the metadata. I'm thinking could we make other optimizations to make MergedInputChannelStateHandle simpler while ensuring that InputChannelStateHandle can be restored? > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793706#comment-17793706 ] Piotr Nowojski commented on FLINK-33734: That's a good question [~Zakelly]. Apart of that: {quote} jobmangager converts MergedInputChannelStateHandle to InputChannelStateHandle collection before assigning state handle, and the rest of the process does not need to be changed. {quote} Doesn't this mean we will have a very similar problem during recovery? That sending out the RPCs during recovery will take a long time? Wouldn't it be better to keep the state handles merged during recovery until they reach their destined subtasks on TMs? > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-33734) Merge unaligned checkpoint state handle
[ https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17793505#comment-17793505 ] Zakelly Lan commented on FLINK-33734: - hi [~Feifan Wang] , I'm wondering if this proposal aims to reduce the meta size only and whether [FLIP-306|https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints] could alleviate this problem? And can this proposal work together with the [FLIP-306|https://cwiki.apache.org/confluence/display/FLINK/FLIP-306%3A+Unified+File+Merging+Mechanism+for+Checkpoints]? > Merge unaligned checkpoint state handle > --- > > Key: FLINK-33734 > URL: https://issues.apache.org/jira/browse/FLINK-33734 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing >Reporter: Feifan Wang >Assignee: Feifan Wang >Priority: Major > > h3. Background > Unaligned checkpoint will write the inflight-data of all InputChannel and > ResultSubpartition of the same subtask to the same file during checkpoint. > The InputChannelStateHandle and ResultSubpartitionStateHandle organize the > metadata of inflight-data at the channel granularity, which causes the file > name to be repeated many times. When a job is under backpressure and task > parallelism is high, the metadata of unaligned checkpoints will bloat. This > will result in: > # The amount of data reported by taskmanager to jobmanager increases, and > jobmanager takes longer to process these RPC requests. > # The metadata of the entire checkpoint becomes very large, and it takes > longer to serialize and write it to dfs. > Both of the above points ultimately lead to longer checkpoint duration. > h3. A Production example > Take our production job with a parallelism of 4800 as an example: > # When there is no back pressure, checkpoint end-to-end duration is within 7 > seconds. > # When under pressure: checkpoint end-to-end duration often exceeds 1 > minute. We found that jobmanager took more than 40 seconds to process rpc > requests, and serialized metadata took more than 20 seconds.Some checkpoint > statistics: > |metadata file size|950 MB| > |channel state count|12,229,854| > |channel file count|5536| > Of the 950MB in the metadata file, 68% are redundant file paths. > We enabled log-based checkpoint on this job and hoped that the checkpoint > could be completed within 30 seconds. This problem made it difficult to > achieve this goal. > h3. Propose changes > I suggest introducing MergedInputChannelStateHandle and > MergedResultSubpartitionStateHandle to eliminate redundant file paths. > The taskmanager merges all InputChannelStateHandles with the same delegated > StreamStateHandle in the same subtask into one MergedInputChannelStateHandle > before reporting. When recovering from checkpoint, jobmangager converts > MergedInputChannelStateHandle to InputChannelStateHandle collection before > assigning state handle, and the rest of the process does not need to be > changed. > Structure of MergedInputChannelStateHandle : > > {code:java} > { // MergedInputChannelStateHandle > "delegate": { > "filePath": > "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf", > "stateSize": 123456 > }, > "size": 2000, > "subtaskIndex":0, > "channels": [ // One InputChannel per element > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 0 > }, > "offsets": [ > 100,200,300,400 > ], > "size": 1400 > }, > { > "info": { > "gateIdx": 0, > "inputChannelIdx": 1 > }, > "offsets": [ > 500,600 > ], > "size": 600 > } > ] > } > {code} > MergedResultSubpartitionStateHandle is similar. > > > WDYT [~roman] , [~pnowojski] , [~fanrui] ? -- This message was sent by Atlassian Jira (v8.20.10#820010)