[ https://issues.apache.org/jira/browse/FLINK-19911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-19911: ----------------------------------- Labels: auto-deprioritized-major auto-deprioritized-minor auto-unassigned pull-request-available (was: auto-deprioritized-major auto-unassigned pull-request-available stale-minor) Priority: Not a Priority (was: Minor) This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > add read buffer for input stream > -------------------------------- > > Key: FLINK-19911 > URL: https://issues.apache.org/jira/browse/FLINK-19911 > Project: Flink > Issue Type: Improvement > Components: FileSystems, Runtime / Checkpointing, Runtime / State > Backends > Affects Versions: 1.11.3, 1.12.0, 1.13.0 > Environment: Flink version: 1.10 > StateBackend : FsStateBackend > code: Flink SQL count(distinct userId) > uv: 10 million > State size: 200M > TM total memory: 16G > Parallelism: 1 > Reporter: future > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > auto-unassigned, pull-request-available > > Heap StateBackend needs to serialize each Java Object into the file system > during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to > read kvs from RocksDB and write them to the file system in the snapshot. > The above two cases involve a lot of small io, not large io, frequent small > io is not friendly to disk. Therefore, the buffer is used in the checkpoint > snapshot writing process of the file system. For details, refer to the buffer > of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}. > There will be many small IOs in the restore process, but restore does not > have a buffer. So I added a buffer and tested it based on Flink job. > h2. Flink Job environment: > {code:java} > Flink version: 1.10 > StateBackend : FsStateBackend > code: Flink SQL count(distinct userId) > uv: 10 million > State size: 200M > TM total memory: 16G > Parallelism: 1{code} > It takes 33.1s to restore without read buffer, and 12.8s to restore with read > buffer. > h2. How to do it? > Use FSDataBufferedInputStream to wrap fsDataInputStream in > HeapRestoreOperation#restore,code: > {code:java} > FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream(); > FSDataInputStream bufferedInputStream = new > FSDataBufferedInputStream(fsDataInputStream); > {code} > -- This message was sent by Atlassian Jira (v8.20.1#820001)