[ 
https://issues.apache.org/jira/browse/FLINK-11937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Congxian Qiu(klion26) updated FLINK-11937:
------------------------------------------
    Description: 
Currently when incremental checkpoint is enabled in RocksDBStateBackend a 
separate file will be generated on DFS for each sst file. This may cause “file 
flood” when running intensive workload (many jobs with high parallelism) in big 
cluster. According to our observation in Alibaba production, such file flood 
introduces at lease two drawbacks when using HDFS as the checkpoint storage 
FileSystem: 1) huge number of RPC request issued to NN which may burst its 
response queue; 2) huge number of files causes big pressure on NN’s on-heap 
memory.

In Flink we ever noticed similar small file flood problem and tried to resolved 
it by introducing ByteStreamStateHandle(FLINK-2808), but this solution has its 
limitation that if we configure the threshold too low there will still be too 
many small files, while if too high the JM will finally OOM, thus could hardly 
resolve the issue in case of using RocksDBStateBackend with incremental 
snapshot strategy.

We propose a new OutputStream called 
FileSegmentCheckpointStateOutputStream(FSCSOS) to fix the problem. FSCSOS will 
reuse the same underlying distributed file until its size exceeds a preset 
threshold. We
plan to complete the work in 3 steps: firstly introduce FSCSOS, secondly 
resolve the specific storage amplification issue on FSCSOS, and lastly add an 
option to reuse FSCSOS across multiple checkpoints to further reduce the DFS 
file number.

More details please refer to the attached design doc.

  was:
Currently when incremental checkpoint is enabled in RocksDBStateBackend a 
separate file will be generated on DFS for each sst file. This may cause “file 
flood” when running intensive workload (many jobs with high parallelism) in big 
cluster. According to our observation in Alibaba production, such file flood 
introduces at lease two drawbacks when using HDFS as the checkpoint storage 
FileSystem: 1) huge number of RPC request issued to NN which may burst its 
response queue; 2) huge number of files causes big pressure on NN’s on-heap 
memory.

In Flink we ever noticed similar small file flood problem and tried to resolved 
it by introducing ByteStreamStateHandle(FLINK-2818), but this solution has its 
limitation that if we configure the threshold too low there will still be too 
many small files, while if too high the JM will finally OOM, thus could hardly 
resolve the issue in case of using RocksDBStateBackend with incremental 
snapshot strategy.

We propose a new OutputStream called 
FileSegmentCheckpointStateOutputStream(FSCSOS) to fix the problem. FSCSOS will 
reuse the same underlying distributed file until its size exceeds a preset 
threshold. We
plan to complete the work in 3 steps: firstly introduce FSCSOS, secondly 
resolve the specific storage amplification issue on FSCSOS, and lastly add an 
option to reuse FSCSOS across multiple checkpoints to further reduce the DFS 
file number.

More details please refer to the attached design doc.

[1] 
[https://www.slideshare.net/dataArtisans/stephan-ewen-experiences-running-flink-at-very-large-scale]
 


> Resolve small file problem in RocksDB incremental checkpoint
> ------------------------------------------------------------
>
>                 Key: FLINK-11937
>                 URL: https://issues.apache.org/jira/browse/FLINK-11937
>             Project: Flink
>          Issue Type: New Feature
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.7.2
>            Reporter: Congxian Qiu(klion26)
>            Assignee: Congxian Qiu(klion26)
>            Priority: Major
>
> Currently when incremental checkpoint is enabled in RocksDBStateBackend a 
> separate file will be generated on DFS for each sst file. This may cause 
> “file flood” when running intensive workload (many jobs with high 
> parallelism) in big cluster. According to our observation in Alibaba 
> production, such file flood introduces at lease two drawbacks when using HDFS 
> as the checkpoint storage FileSystem: 1) huge number of RPC request issued to 
> NN which may burst its response queue; 2) huge number of files causes big 
> pressure on NN’s on-heap memory.
> In Flink we ever noticed similar small file flood problem and tried to 
> resolved it by introducing ByteStreamStateHandle(FLINK-2808), but this 
> solution has its limitation that if we configure the threshold too low there 
> will still be too many small files, while if too high the JM will finally 
> OOM, thus could hardly resolve the issue in case of using RocksDBStateBackend 
> with incremental snapshot strategy.
> We propose a new OutputStream called 
> FileSegmentCheckpointStateOutputStream(FSCSOS) to fix the problem. FSCSOS 
> will reuse the same underlying distributed file until its size exceeds a 
> preset threshold. We
> plan to complete the work in 3 steps: firstly introduce FSCSOS, secondly 
> resolve the specific storage amplification issue on FSCSOS, and lastly add an 
> option to reuse FSCSOS across multiple checkpoints to further reduce the DFS 
> file number.
> More details please refer to the attached design doc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to