[ 
https://issues.apache.org/jira/browse/FLINK-33734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17813646#comment-17813646
 ] 

Feifan Wang commented on FLINK-33734:
-------------------------------------

I found that only merging handles was not enough. Although the metadata was 
obviously smaller, it still took a long time for jobmanager to process rpc and 
serialize metadata. I speculate the reason is that although the merge handle 
avoids duplicate file paths, but the number of objects in the metadata is not 
significantly reduced (mainly channel infos). So I tried serializing the 
channel infos directly on the taskmanager side, and the test results showed 
that this worked well. Below are the results of my test:
{code:java}
# control group :
checkpoint time         (    s ) --- avg: 82.73                max: 155.88      
         min: 49.90
store metadata time     (    s ) --- avg: 23.45                max: 47.60       
         min: 9.61
metadata size           (   MB ) --- avg: 696.55               max: 954.98      
         min: 461.35
store metadata speed    ( MB/s ) --- avg: 32.41                max: 61.82       
         min: 18.29

# only merge handle :
checkpoint time         (    s ) --- avg: 66.22                max: 123.12      
         min: 38.68
store metadata time     (    s ) --- avg: 12.76                max: 26.18       
         min: 4.02
metadata size           (   MB ) --- avg: 269.14               max: 394.26      
         min: 159.86
store metadata speed    ( MB/s ) --- avg: 23.93                max: 46.55       
         min: 11.33

# not only merge handles, but also serialize channel infos on TaskMangager :
checkpoint time         (    s ) --- avg: 30.63                max: 74.27       
         min: 5.16
store metadata time     (    s ) --- avg: 0.87                 max: 11.23       
         min: 0.12
metadata size           (   MB ) --- avg: 232.22               max: 392.86      
         min: 45.34
store metadata speed    ( MB/s ) --- avg: 291.00               max: 386.80      
         min: 23.18{code}
Based on the results of the above test, I think serializing channel infos on 
the taskmanger side should be done together. I submitted a PR to implement this 
solution, please have a look [~pnowojski] ,[~fanrui] ,[~roman] , [~Zakelly] .

> Merge unaligned checkpoint state handle
> ---------------------------------------
>
>                 Key: FLINK-33734
>                 URL: https://issues.apache.org/jira/browse/FLINK-33734
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: Feifan Wang
>            Assignee: Feifan Wang
>            Priority: Major
>              Labels: pull-request-available
>
> h3. Background
> Unaligned checkpoint will write the inflight-data of all InputChannel and 
> ResultSubpartition of the same subtask to the same file during checkpoint. 
> The InputChannelStateHandle and ResultSubpartitionStateHandle organize the 
> metadata of inflight-data at the channel granularity, which causes the file 
> name to be repeated many times. When a job is under backpressure and task 
> parallelism is high, the metadata of unaligned checkpoints will bloat. This 
> will result in:
>  # The amount of data reported by taskmanager to jobmanager increases, and 
> jobmanager takes longer to process these RPC requests.
>  # The metadata of the entire checkpoint becomes very large, and it takes 
> longer to serialize and write it to dfs.
> Both of the above points ultimately lead to longer checkpoint duration.
> h3. A Production example
> Take our production job with a parallelism of 4800 as an example:
>  # When there is no back pressure, checkpoint end-to-end duration is within 7 
> seconds.
>  # When under pressure: checkpoint end-to-end duration often exceeds 1 
> minute. We found that jobmanager took more than 40 seconds to process rpc 
> requests, and serialized metadata took more than 20 seconds.Some checkpoint 
> statistics:
> |metadata file size|950 MB|
> |channel state count|12,229,854|
> |channel file count|5536|
> Of the 950MB in the metadata file, 68% are redundant file paths.
> We enabled log-based checkpoint on this job and hoped that the checkpoint 
> could be completed within 30 seconds. This problem made it difficult to 
> achieve this goal.
> h3. Propose changes
> I suggest introducing MergedInputChannelStateHandle and 
> MergedResultSubpartitionStateHandle to eliminate redundant file paths.
> The taskmanager merges all InputChannelStateHandles with the same delegated 
> StreamStateHandle in the same subtask into one MergedInputChannelStateHandle 
> before reporting. When recovering from checkpoint, jobmangager converts 
> MergedInputChannelStateHandle to InputChannelStateHandle collection before 
> assigning state handle, and the rest of the process does not need to be 
> changed. 
> Structure of MergedInputChannelStateHandle :
>  
> {code:java}
> {   // MergedInputChannelStateHandle
>     "delegate": {
>         "filePath": 
> "viewfs://hadoop-meituan/flink-yg15/checkpoints/retained/1234567/ab8d0c2f02a47586490b15e7a2c30555/chk-31/ffe54c0a-9b6e-4724-aae7-61b96bf8b1cf",
>         "stateSize": 123456
>     },
>     "size": 2000,
>     "subtaskIndex":0,
>     "channels": [ // One InputChannel per element
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 0
>             },
>             "offsets": [
>                 100,200,300,400
>             ],
>             "size": 1400
>         },
>         {
>             "info": {
>                 "gateIdx": 0,
>                 "inputChannelIdx": 1
>             },
>             "offsets": [
>                 500,600
>             ],
>             "size": 600
>         }
>     ]
> }
>  {code}
> MergedResultSubpartitionStateHandle is similar.
>  
>  
> WDYT [~roman] , [~pnowojski] , [~fanrui] ?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to