[ https://issues.apache.org/jira/browse/FLINK-26803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17522304#comment-17522304 ]
fanrui commented on FLINK-26803: -------------------------------- Hi [~akalashnikov] , thanks for your reply. In fact, there is only one immediate benefit: reducing the number of small files. If we need to understand the advantages, I think we can sort it out: What are the problems with too many small files? # High pressure on hdfs NN # If UC (Unaligned Checkpoint) is enabled for all flink jobs, when external components such as Kafka slow down, a large number of flink jobs write Kafka slow. It will lead to severe back pressure, and a large number of jobs may write a large number of small files at this time. It may cause hdfs NN avalanche. Some of our important jobs hope that when the back pressure is severe, the CP can be successful. After research, we found that UC can solve this problem. However, due to the small file problem, we dare not use it in a large number of jobs. So I think solving the small file problem is mainly used to improve the availability and stability of the job. Of course, I'd love to hear more from other flink users. By the way, I know the [state.storage.fs.memory-threshold|https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-storage-fs-memory-threshold], but I think the size of input + output channel state is always much large than 20KB. For large parallelism flink jobs, this size is usually more than 1M. Regarding the code implementation, I have some ideas, and I'd like to discuss with you. Adding the ChannelStateFileManager: * During the initialization of all Tasks, all Tasks need to be registered with ChannelStateFileManager. * When checkpointing, all tasks write files to ChannelStateFileManager, and ChannelStateFileManager returns path, offset and length. * Since ChannelStateFileManager knows all the tasks, it knows whether all the tasks of the current checkpointId are over. Please correct me if any wrong, I look forward to discussing with you in depth. Thanks. > Merge small ChannelState file for Unaligned Checkpoint > ------------------------------------------------------ > > Key: FLINK-26803 > URL: https://issues.apache.org/jira/browse/FLINK-26803 > Project: Flink > Issue Type: Improvement > Components: Runtime / Checkpointing, Runtime / Network > Reporter: fanrui > Priority: Major > > When making an unaligned checkpoint, the number of ChannelState files is > TaskNumber * subtaskNumber. For high parallelism job, it writes too many > small files. It causes high load for hdfs NN. > > In our production, a job writes more than 50K small files for each Unaligned > Checkpoint. Could we merge these files before write FileSystem? We can > configure the maximum number of files each TM can write in a single Unaligned > Checkpoint. -- This message was sent by Atlassian Jira (v8.20.1#820001)