[ https://issues.apache.org/jira/browse/FLINK-31238?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Yue Ma updated FLINK-31238: --------------------------- Attachment: image-2023-03-09-15-46-01-176.png > Use IngestDB to speed up Rocksdb rescaling recovery > ---------------------------------------------------- > > Key: FLINK-31238 > URL: https://issues.apache.org/jira/browse/FLINK-31238 > Project: Flink > Issue Type: Improvement > Components: Runtime / State Backends > Affects Versions: 1.16.1 > Reporter: Yue Ma > Priority: Major > Attachments: image-2023-02-27-16-41-18-552.png, > image-2023-02-27-16-57-18-435.png, image-2023-03-07-14-27-10-260.png, > image-2023-03-09-15-23-30-581.png, image-2023-03-09-15-26-12-314.png, > image-2023-03-09-15-28-32-363.png, image-2023-03-09-15-41-03-074.png, > image-2023-03-09-15-41-08-379.png, image-2023-03-09-15-45-56-081.png, > image-2023-03-09-15-46-01-176.png > > > There have been many discussions and optimizations in the community about > optimizing rocksdb scaling and recovery. > https://issues.apache.org/jira/browse/FLINK-17971 > https://issues.apache.org/jira/browse/FLINK-8845 > https://issues.apache.org/jira/browse/FLINK-21321 > We hope to discuss some of our explorations under this ticket > The process of scaling and recovering in rocksdb simply requires two steps > # Insert the valid keyGroup data of the new task. > # Delete the invalid data in the old stateHandle. > The current method for data writing is to specify the main Db first and then > insert data using writeBatch.In addition, the method of deleteRange is > currently used to speed up the ClipDB. But in our production environment, we > found that the speed of rescaling is still very slow, especially when the > state of a single Task is large. > > We hope that the previous sst file can be reused directly when restoring > state, instead of retraversing the data. So we made some attempts to optimize > it in our internal version of flink and frocksdb. > > We added two APIs *ClipDb* and *IngestDb* in frocksdb. > * ClipDB is used to clip the data of a DB. Different from db.DeteleRange and > db.Delete, DeleteValue and RangeTombstone will not be generated for parts > beyond the key range. We will iterate over the FileMetaData of db. Process > each sst file. There are three situations here. > If all the keys of a file are required, we will keep the sst file and do > nothing > If all the keys of the sst file exceed the specified range, we will delete > the file directly. > If we only need some part of the sst file, we will rewrite the required keys > to generate a new sst file。 > All sst file changes will be placed in a VersionEdit, and the current > versions will LogAndApply this edit to ensure that these changes can take > effect > * IngestDb is used to directly ingest all sst files of one DB into another > DB. But it is necessary to strictly ensure that the keys of the two DBs do > not overlap, which is easy to do in the Flink scenario. The hard link method > will be used in the process of ingesting files, so it will be very fast. At > the same time, the file number of the main DB will be incremented > sequentially, and the SequenceNumber of the main DB will be updated to the > larger SequenceNumber of the two DBs. > When IngestDb and ClipDb are supported, the state restoration logic is as > follows > * Open the first StateHandle as the main DB and pause the compaction. > * Clip the main DB according to the KeyGroup range of the Task with ClipDB > * Open other StateHandles in sequence as Tmp DB, and perform ClipDb > according to the KeyGroup range > * Ingest all tmpDb into the main Db after tmpDb cliped > * Open the Compaction process of the main DB > !image-2023-02-27-16-57-18-435.png|width=434,height=152! > We have done some benchmark tests on the internal Flink version, and the test > results show that compared with the writeBatch method, the expansion and > recovery speed of IngestDb can be increased by 5 to 10 times as follows > (SstFileWriter means uses the recovery method of generating sst files through > SstFileWriter in parallel) > * parallelism changes from 4 to 2 > |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*| > |500M|Iteration 1: 8.018 s/op > Iteration 2: 9.551 s/op > Iteration 3: 7.486 s/op|Iteration 1: 6.041 s/op > Iteration 2: 5.934 s/op > Iteration 3: 6.707 s/o|{color:#ff0000}Iteration 1: 3.922 s/op{color} > {color:#ff0000}Iteration 2: 3.208 s/op{color} > {color:#ff0000}Iteration 3: 3.096 s/op{color}| > |1G|Iteration 1: 19.686 s/op > Iteration 2: 19.402 s/op > Iteration 3: 21.146 s/op|Iteration 1: 17.538 s/op > Iteration 2: 16.933 s/op > Iteration 3: 15.486 s/op|{color:#ff0000}Iteration 1: 6.207 s/op{color} > {color:#ff0000}Iteration 2: 7.164 s/op{color} > {color:#ff0000}Iteration 3: 6.397 s/op{color}| > |5G|Iteration 1: 244.795 s/op > Iteration 2: 243.141 s/op > Iteration 3: 253.542 s/op|Iteration 1: 78.058 s/op > Iteration 2: 85.635 s/op > Iteration 3: 76.568 s/op|{color:#ff0000}Iteration 1: 23.397 s/op{color} > {color:#ff0000}Iteration 2: 21.387 s/op{color} > {color:#ff0000}Iteration 3: 22.858 s/op{color}| > * parallelism changes from 4 to 8 > |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*| > |500M|Iteration 1: 3.477 s/op > Iteration 2: 3.515 s/op > Iteration 3: 3.433 s/op|Iteration 1: 3.453 s/op > Iteration 2: 3.300 s/op > Iteration 3: 3.313 s/op|{color:#ff0000}Iteration 1: 0.941 s/op{color} > {color:#ff0000}Iteration 2: 0.963 s/op{color} > {color:#ff0000}Iteration 3: 1.102 s/op{color}| > |1G|IIteration 1: 7.571 s/op > Iteration 2: 7.352 s/op > Iteration 3: 7.568 s/op|Iteration 1: 5.032 s/op > Iteration 2: 4.689 s/op > Iteration 3: 6.883 s/op|{color:#ff0000}Iteration 1: 2.130 s/op{color} > {color:#ff0000}Iteration 2: 2.110 s/op{color} > {color:#ff0000}Iteration 3: 2.034 s/op{color}| > |5G|Iteration 1: 91.870 s/op > Iteration 2: 94.229 s/op > Iteration 3: 93.271 s/op|Iteration 1: 25.845 s/op > Iteration 2: 25.571 s/op > Iteration 3: 25.685 s/op|{color:#ff0000}Iteration 1: 11.154 s/op{color} > {color:#ff0000}Iteration 2: 10.732 s/op{color} > {color:#ff0000}Iteration 3: 10.622 s/op{color}| > * parallelism changes from 4 to 6 > |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*| > |500M|Iteration 1: 8.209 s/op > Iteration 2: 9.893 s/op > Iteration 3: 9.150 s/op|Iteration 1: 6.041 s/op > Iteration 2: 5.934 s/op > Iteration 3: 6.707 s/o|{color:#ff0000}Iteration 1: 2.622 s/op{color} > {color:#ff0000}Iteration 2: 2.545 s/op{color} > {color:#ff0000}Iteration 3: 2.573 s/op{color}| > |1G|Iteration 1: 21.206 s/op > Iteration 2: 26.214 s/op > Iteration 3: 20.269 s/op|Iteration 1: 10.043 s/op > Iteration 2: 10.744 s/op > Iteration 3: 10.461 s/op|{color:#ff0000}Iteration 1: 4.400 s/op{color} > {color:#ff0000}Iteration 2: 4.340 s/op{color} > {color:#ff0000}Iteration 3: 6.234 s/op{color}| > |5G|IIteration 1: 170.606 s/op > Iteration 2: 160.576 s/op > Iteration 3: 159.425 s/op|IIteration 1: 52.537 s/op > Iteration 2: 50.576 s/op > Iteration 3: 50.823 s/op|{color:#ff0000}Iteration 1: 19.053 s/op{color} > {color:#ff0000}Iteration 2: 18.504 s/op{color} > {color:#ff0000}Iteration 3: 18.249 s/op{color}| > * parallelism changes from 4 to 3 > |*TaskStateSize*|*Write_Batch*|*SST_File_Writer*|*Ingest_DB*| > |500M|Iteration 1: 6.330 s/op > Iteration 2: 5.614 s/op > Iteration 3: 5.736 s/op|Iteration 1: 4.083 s/op > Iteration 2: 5.655 s/op > Iteration 3: 3.998 s/op|{color:#ff0000}Iteration 1: 2.157 s/op{color} > {color:#ff0000}Iteration 2: 2.201 s/op{color} > {color:#ff0000}Iteration 3: 3.212 s/op{color}| > |1G|Iteration 1: 13.814 s/op > Iteration 2: 12.852 s/op > Iteration 3: 13.480 s/op|Iteration 1: 9.619 s/op > Iteration 2: 9.197 s/op > Iteration 3: 8.694 s/op|{color:#ff0000}Iteration 1: 4.227 s/op{color} > {color:#ff0000}Iteration 2: 4.234 s/op{color} > {color:#ff0000}Iteration 3: 4.177 s/op{color}| > |5G|Iteration 1: 136.621 s/op > Iteration 2: 127.097 s/op > Iteration 3: 139.694 s/op|Iteration 1: 39.612 s/op > Iteration 2: 38.809 s/op > Iteration 3: 39.125 s/op|{color:#ff0000}Iteration 1: 16.691 s/op{color} > {color:#ff0000}Iteration 2: 16.599 s/op{color} > {color:#ff0000}Iteration 3: 16.726 s/op{color}| -- This message was sent by Atlassian Jira (v8.20.10#820010)