Yue Ma created FLINK-31238:
------------------------------

             Summary: Use IngestDB to speed up Rocksdb rescaling recovery 
                 Key: FLINK-31238
                 URL: https://issues.apache.org/jira/browse/FLINK-31238
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Checkpointing
    Affects Versions: 1.16.1
            Reporter: Yue Ma
             Fix For: 1.16.2
         Attachments: image-2023-02-27-16-41-18-552.png

There have been many discussions and optimizations in the community about 
optimizing rocksdb scaling and recovery.

https://issues.apache.org/jira/browse/FLINK-17971

https://issues.apache.org/jira/browse/FLINK-8845

https://issues.apache.org/jira/browse/FLINK-21321

We hope to discuss some of our explorations under this ticket

The process of scaling and recovering in rocksdb simply requires two steps
 # Insert the valid keyGroup data of the new task.
 # Delete the invalid data in the old stateHandle.

The current method for data writing is to specify the main Db first and then 
insert data using writeBatch.In addition, the method of deleteRange is 
currently used to speed up the ClipDB. But in our production environment, we 
found that the speed of rescaling is still very slow, especially when the state 
of a single Task is large. 

 

We hope that the previous sst file can be reused directly when restoring state, 
instead of retraversing the data. So we made some attempts to optimize it in 
our internal version of flink and frocksdb.

 

We added two APIs *ClipDb* and *IngestDb* in frocksdb. 
 * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and 
db.Delete, DeleteValue and RangeTombstone will not be generated for parts 
beyond the key range. We will iterate over the FileMetaData of db. Process each 
sst file. There are three situations here. 
If all the keys of a file are required, we will keep the sst file and do 
nothing 
If all the keys of the sst file exceed the specified range, we will delete the 
file directly. 
If we only need some part of the sst file, we will rewrite the required keys to 
generate a new sst file。
All sst file changes will be placed in a VersionEdit, and the current versions 
will LogAndApply this edit to ensure that these changes can take effect
 * IngestDb is used to directly ingest all sst files of one DB into another DB. 
But it is necessary to strictly ensure that the keys of the two DBs do not 
overlap, which is easy to do in the Flink scenario. The hard link method will 
be used in the process of ingesting files, so it will be very fast. At the same 
time, the file number of the main DB will be incremented sequentially, and the 
SequenceNumber of the main DB will be updated to the larger SequenceNumber of 
the two DBs.

When IngestDb and ClipDb are supported, the state restoration logic is as 
follows
 * Open the first StateHandle as the main DB and pause the compaction.
 * Clip the main DB according to the KeyGroup range of the Task with ClipDB
 * Open other StateHandles in sequence as Tmp DB, and perform ClipDb  according 
to the KeyGroup range
 * Ingest all tmpDb into the main Db after tmpDb cliped
 * Open the Compaction process of the main DB
!image-2023-02-27-16-41-18-552.png!

We have done some benchmark tests on the internal Flink version, and the test 
results show that compared with the writeBatch method, the expansion and 
recovery speed of IngestDb can be increased by 5 to 10 times As follows 

 
 * parallelism changes from 4 to 2

|*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
|500M|Iteration 1: 8.018 s/op
Iteration 2: 9.551 s/op
Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op
Iteration 2: 5.934 s/op
Iteration 3: 6.707 s/o|{color:#FF0000}Iteration 1: 3.922 s/op{color}
{color:#FF0000}Iteration 2: 3.208 s/op{color}
{color:#FF0000}Iteration 3: 3.096 s/op{color}|
|1G|Iteration 1: 19.686 s/op
Iteration 2: 19.402 s/op
Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op
Iteration 2: 16.933 s/op
Iteration 3: 15.486 s/op|{color:#FF0000}Iteration 1: 6.207 s/op{color}
{color:#FF0000}Iteration 2: 7.164 s/op{color}
{color:#FF0000}Iteration 3: 6.397 s/op{color}|
|5G|Iteration 1: 244.795 s/op
Iteration 2: 243.141 s/op
Iteration 3: 253.542 s/op|Iteration 1: 78.058 s/op
Iteration 2: 85.635 s/op
Iteration 3: 76.568 s/op|{color:#FF0000}Iteration 1: 23.397 s/op{color}
{color:#FF0000}Iteration 2: 21.387 s/op{color}
{color:#FF0000}Iteration 3: 22.858 s/op{color}|
 * parallelism changes from 4 to 8

|*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
|500M|Iteration 1: 3.477 s/op
Iteration 2: 3.515 s/op
Iteration 3: 3.433 s/op|Iteration 1: 3.453 s/op
Iteration 2: 3.300 s/op
Iteration 3: 3.313 s/op|{color:#FF0000}Iteration 1: 0.941 s/op{color}
{color:#FF0000}Iteration 2: 0.963 s/op{color}
{color:#FF0000}Iteration 3: 1.102 s/op{color}|
|1G|IIteration 1: 7.571 s/op
Iteration 2: 7.352 s/op
Iteration 3: 7.568 s/op|Iteration 1: 5.032 s/op
Iteration 2: 4.689 s/op
Iteration 3: 6.883 s/op|{color:#FF0000}Iteration 1: 2.130 s/op{color}
{color:#FF0000}Iteration 2: 2.110 s/op{color}
{color:#FF0000}Iteration 3: 2.034 s/op{color}|
|5G|Iteration 1: 91.870 s/op
Iteration 2: 94.229 s/op
Iteration 3: 93.271 s/op|Iteration 1: 25.845 s/op
Iteration 2: 25.571 s/op
Iteration 3: 25.685 s/op|{color:#FF0000}Iteration 1: 11.154 s/op{color}
{color:#FF0000}Iteration 2: 10.732 s/op{color}
{color:#FF0000}Iteration 3: 10.622 s/op{color}|
 * parallelism changes from 4 to 6

|*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*|
|500M|Iteration 1: 8.209 s/op
Iteration 2: 9.893 s/op
Iteration 3: 9.150 s/op|Iteration 1: 6.041 s/op
Iteration 2: 5.934 s/op
Iteration 3: 6.707 s/o|{color:#FF0000}Iteration 1: 2.622 s/op{color}
{color:#FF0000}Iteration 2: 2.545 s/op{color}
{color:#FF0000}Iteration 3: 2.573 s/op{color}|
|1G|Iteration 1: 21.206 s/op
Iteration 2: 26.214 s/op
Iteration 3: 20.269 s/op|Iteration 1: 10.043 s/op
Iteration 2: 10.744 s/op
Iteration 3: 10.461 s/op|{color:#FF0000}Iteration 1: 4.400 s/op{color}
{color:#FF0000}Iteration 2: 4.340 s/op{color}
{color:#FF0000}Iteration 3: 6.234 s/op{color}|
|5G|IIteration 1: 170.606 s/op
Iteration 2: 160.576 s/op
Iteration 3: 159.425 s/op|IIteration 1: 52.537 s/op
Iteration 2: 50.576 s/op
Iteration 3: 50.823 s/op|{color:#FF0000}Iteration 1: 19.053 s/op{color}
{color:#FF0000}Iteration 2: 18.504 s/op{color}
{color:#FF0000}Iteration 3: 18.249 s/op{color}|
 * parallelism changes from 4 to 3

|*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB* |
|500M|Iteration 1: 6.330 s/op
Iteration 2: 5.614 s/op
Iteration 3: 5.736 s/op|Iteration 1: 4.083 s/op
Iteration 2: 5.655 s/op
Iteration 3: 3.998 s/op|{color:#FF0000}Iteration 1: 2.157 s/op{color}
{color:#FF0000}Iteration 2: 2.201 s/op{color}
{color:#FF0000}Iteration 3: 3.212 s/op{color}|
|1G|Iteration 1: 13.814 s/op
Iteration 2: 12.852 s/op
Iteration 3: 13.480 s/op|Iteration 1: 9.619 s/op
Iteration 2: 9.197 s/op
Iteration 3: 8.694 s/op|{color:#FF0000}Iteration 1: 4.227 s/op{color}
{color:#FF0000}Iteration 2: 4.234 s/op{color}
{color:#FF0000}Iteration 3: 4.177 s/op{color}|
|5G|Iteration 1: 136.621 s/op
Iteration 2: 127.097 s/op
Iteration 3: 139.694 s/op|Iteration 1: 39.612 s/op
Iteration 2: 38.809 s/op
Iteration 3: 39.125 s/op|{color:#FF0000}Iteration 1: 16.691 s/op{color}
{color:#FF0000}Iteration 2: 16.599 s/op{color}
{color:#FF0000}Iteration 3: 16.726 s/op{color}|



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to