[ 
https://issues.apache.org/jira/browse/FLINK-19911?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

fanrui updated FLINK-19911:
---------------------------
    Description: 
Heap StateBackend needs to serialize each Java Object into the file system 
during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to read 
kvs from RocksDB and write them to the file system in the snapshot.

The above two cases involve a lot of small io, not large io, frequent small io 
is not friendly to disk. Therefore, the buffer is used in the checkpoint 
snapshot writing process of the file system. For details, refer to the buffer 
of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}.

There will be many small IOs in the restore process, but restore does not have 
a buffer. So I added a buffer and tested it based on Flink job.
h2. Flink Job environment:
{code:java}
 Flink version: 1.10
 StateBackend : FsStateBackend 
 code: Flink SQL count(distinct userId)
 uv: 10 million
 State size: 200M
 TM total memory: 16G
 Parallelism: 1{code}
It takes 33.1s to restore without read buffer, and 12.8s to restore with read 
buffer.
h2. How to do it?

Use FSDataBufferedInputStream to wrap fsDataInputStream in 
HeapRestoreOperation#restore,code:
{code:java}
FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
FSDataInputStream bufferedInputStream = new 
FSDataBufferedInputStream(fsDataInputStream);
{code}
 

  was:
Heap StateBackend needs to serialize each Java Object into the file system 
during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to read 
kvs from RocksDB and write them to the file system in the snapshot.

The above two cases involve a lot of small io, not large io, frequent small io 
is not friendly to disk. Therefore, the buffer is used in the checkpoint 
snapshot writing process of the file system. For details, refer to the buffer 
of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}.

There will be many small IOs in the restore process, but restore does not have 
a buffer. So I added a buffer and tested it based on Flink job.
h2. Flink Job environment:
Flink version: 1.10
StateBackend : FsStateBackend 
code: Flink SQL count(distinct userId)
uv: 10 million
State size: 200M
TM total memory: 16G
Parallelism: 1
It takes 33.1s to restore without read buffer, and 12.8s to restore with read 
buffer.
h2. How to do it?

Use FSDataBufferedInputStream to wrap fsDataInputStream in 
HeapRestoreOperation#restore,code:
{code:java}
FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
FSDataInputStream bufferedInputStream = new 
FSDataBufferedInputStream(fsDataInputStream);
{code}
 


> Read checkpoint stream with buffer to speedup restore
> -----------------------------------------------------
>
>                 Key: FLINK-19911
>                 URL: https://issues.apache.org/jira/browse/FLINK-19911
>             Project: Flink
>          Issue Type: Improvement
>          Components: FileSystems, Runtime / Checkpointing, Runtime / State 
> Backends
>    Affects Versions: 1.12.0, 1.11.3, 1.13.0
>         Environment: Flink version: 1.10
> StateBackend : FsStateBackend 
> code: Flink SQL count(distinct userId)
> uv: 10 million
> State size: 200M
> TM total memory: 16G
> Parallelism: 1
>            Reporter: fanrui
>            Priority: Major
>
> Heap StateBackend needs to serialize each Java Object into the file system 
> during snapshot. RocksDB StateBackend's RocksFullSnapshotStrategy needs to 
> read kvs from RocksDB and write them to the file system in the snapshot.
> The above two cases involve a lot of small io, not large io, frequent small 
> io is not friendly to disk. Therefore, the buffer is used in the checkpoint 
> snapshot writing process of the file system. For details, refer to the buffer 
> of {{FsCheckpointStreamFactory.FsCheckpointStateOutputStream}}.
> There will be many small IOs in the restore process, but restore does not 
> have a buffer. So I added a buffer and tested it based on Flink job.
> h2. Flink Job environment:
> {code:java}
>  Flink version: 1.10
>  StateBackend : FsStateBackend 
>  code: Flink SQL count(distinct userId)
>  uv: 10 million
>  State size: 200M
>  TM total memory: 16G
>  Parallelism: 1{code}
> It takes 33.1s to restore without read buffer, and 12.8s to restore with read 
> buffer.
> h2. How to do it?
> Use FSDataBufferedInputStream to wrap fsDataInputStream in 
> HeapRestoreOperation#restore,code:
> {code:java}
> FSDataInputStream fsDataInputStream = keyGroupsStateHandle.openInputStream();
> FSDataInputStream bufferedInputStream = new 
> FSDataBufferedInputStream(fsDataInputStream);
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to