[ 
https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522304#comment-17522304
 ] 

fanrui commented on FLINK-26803:
--------------------------------

Hi [~akalashnikov] , thanks for your reply.

In fact, there is only one immediate benefit: reducing the number of small 
files.

If we need to understand the advantages, I think we can sort it out: What are 
the problems with too many small files?
 # High pressure on hdfs NN
 # If UC (Unaligned Checkpoint) is enabled for all flink jobs, when external 
components such as Kafka slow down, a large number of flink jobs write Kafka 
slow. It will lead to severe back pressure, and a large number of jobs may 
write a large number of small files at this time. It may cause hdfs NN 
avalanche.

Some of our important jobs hope that when the back pressure is severe, the CP 
can be successful. After research, we found that UC can solve this problem. 
However, due to the small file problem, we dare not use it in a large number of 
jobs. So I think solving the small file problem is mainly used to improve the 
availability and stability of the job. Of course, I'd love to hear more from 
other flink users.

By the way, I know the  
[state.storage.fs.memory-threshold|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-storage-fs-memory-threshold],
 but I think the size of  input + output channel state is always much large 
than 20KB. For large parallelism flink jobs, this size is usually more than 1M.

 

Regarding the code implementation, I have some ideas, and I'd like to discuss 
with you.

Adding the ChannelStateFileManager:
 * During the initialization of all Tasks, all Tasks need to be registered with 
ChannelStateFileManager.
 * When checkpointing, all tasks write files to ChannelStateFileManager, and 
ChannelStateFileManager returns path, offset and length.

 * Since ChannelStateFileManager knows all the tasks, it knows whether all the 
tasks of the current checkpointId are over.

Please correct me if any wrong, I look forward to discussing with you in depth. 
Thanks.

> Merge small ChannelState file for Unaligned Checkpoint
> ------------------------------------------------------
>
>                 Key: FLINK-26803
>                 URL: https://issues.apache.org/jira/browse/FLINK-26803
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing, Runtime / Network
>            Reporter: fanrui
>            Priority: Major
>
> When making an unaligned checkpoint, the number of ChannelState files is 
> TaskNumber * subtaskNumber. For high parallelism job, it writes too many 
> small files. It causes high load for hdfs NN.
>  
> In our production, a job writes more than 50K small files for each Unaligned 
> Checkpoint. Could we merge these files before write FileSystem? We can 
> configure the maximum number of files each TM can write in a single Unaligned 
> Checkpoint.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to