[ https://issues.apache.org/jira/browse/FLINK-19911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
fanrui updated FLINK-19911: --------------------------- Comment: was deleted (was: Flink job [code link| [https://github.com/1996fanrui/fanrui-learning/blob/eb9ed4ef17374e865850987b63a97fa7d8ba6886/module-flink/src/main/java/com/dream/flink/io/BigStateIOOptimization.java]]) > Read checkpoint stream with buffer to speedup restore > ----------------------------------------------------- > > Key: FLINK-19911 > URL: https://issues.apache.org/jira/browse/FLINK-19911 > Project: Flink > Issue Type: Improvement > Components: FileSystems, Runtime / Checkpointing, Runtime / State > Backends > Affects Versions: 1.12.0, 1.11.3, 1.13.0 > Environment: Flink version: 1.10 > StateBackend : FsStateBackend > code: Flink SQL count(distinct userId) > uv: 10 million > State size: 200M > TM total memory: 16G > Parallelism: 1 > Reporter: fanrui > Priority: Major > > Heap StateBackend needs to serialize each Java Object into the file system > during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to > read kvs from RocksDB and write them to the file system in the snapshot. > The above two cases involve a lot of small io, not large io, frequent small > io is not friendly to disk. Therefore, the buffer is used in the checkpoint > snapshot writing process of the file system. For details, refer to the buffer > of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}. > There will be many small IOs in the restore process, but restore does not > have a buffer. So I added a buffer and tested it based on Flink job. > h2. Flink Job environment: > {code:java} > Flink version: 1.10 > StateBackend : FsStateBackend > code: Flink SQL count(distinct userId) > uv: 10 million > State size: 200M > TM total memory: 16G > Parallelism: 1{code} > It takes 33.1s to restore without read buffer, and 12.8s to restore with read > buffer. > h2. How to do it? > Use FSDataBufferedInputStream to wrap fsDataInputStream in > HeapRestoreOperation#restore,code: > {code:java} > FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream(); > FSDataInputStream bufferedInputStream = new > FSDataBufferedInputStream(fsDataInputStream); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)