Hi Joey

Previously, I also looked at the mechanism to create on-disk SSTables as I 
planed to use RocksDB's benchmark to mock scenario in Flink. However, I found 
the main challenge is how to ensure the keys are inserted in a strictly 
increasing order. The key order in java could differ from the bytes order in 
RocksDB. In your case, I think it could be much easier as 
RocksFullSnapshotStrategy write data per columnfamily per key group which 
should be in a strictly increasing order [1].

FLINK-17288<https://issues.apache.org/jira/browse/FLINK-17288> could mitigate 
the performance and your solution could help improve the performance much 
better (and could integrate with state-processor-api story).

On the other hand, for out-of-box to use in production for your scenario, how 
about using checkpoint to recover, as it also supports rescale and normal 
recover.

[1] 
https://github.com/apache/flink/blob/f35679966eac9e3bb53a02bcdbd36dbd1341d405/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java#L308


Best
Yun Tang
________________________________
From: Joey Pereira <j...@stripe.com>
Sent: Tuesday, May 19, 2020 2:27
To: user@flink.apache.org <user@flink.apache.org>
Cc: Mike Mintz <mikemi...@stripe.com>; Shahid Chohan <cho...@stripe.com>; Aaron 
Levin <aaronle...@stripe.com>
Subject: RocksDB savepoint recovery performance improvements

Hey,

While running a Flink application with a large-state, savepoint recovery has 
been a painful part of operating the application because recovery time can be 
several hours. During some profiling that chohan (cc'd) had done, a red flag 
stood out — savepoint recovery consisted mostly of RocksDB Get and Put 
operations.

When Flink is bootstrapping state for RocksDB instances this is not what I 
would have expected, as RocksDB supports direct ingestion of the on-disk format 
(SSTables): 
https://github.com/facebook/rocksdb/wiki/Creating-and-Ingesting-SST-files. This 
was also recently reported on Jira: 
https://issues.apache.org/jira/browse/FLINK-17288.

>From what I understood of the current implementation:

* The snapshot restoration pathways, RocksDBFullRestoreOperation and 
RocksDBIncrementalRestoreOperation, use RocksDBWriteBatchWrapper.

* RocksDBWriteBatchWrapper is using RocksDB’s WriteBatch operator. This will 
provide atomicity of batches as well as performance benefits for batching, 
compared to individual Puts, but it will still involve RocksDB’s insert paths 
which can involve expensive operations[0].

Instead, by creating SSTable files and instructing RocksDB to ingest the files, 
writes can be batched even further and avoid expensive operations in RocksDB. 
This is commonly utilized by other systems for restoration or import processes, 
such as in CockroachDB[1], TiDB[2], and Percona[3].  There are some 
restrictions on being able to generate SSTables, as well as limitations for 
ingestion to be performant. Unfortunately, it’s all not very well documented:

1. When generating an SSTable, keys need to be inserted in-order.

2. Ingested files should not have key-ranges that overlap with either existing 
or other ingested files[4]. It is possible to ingest overlapping SSTables, but 
this may incur significant overhead.

To generate SSTables with non-overlapping key-ranges and to create them with 
keys in-order, it would mean that the savepoints would need to be ordered while 
processing them. I'm unsure if this is the case for how Flink's savepoints are 
stored.

I have not dug into RocksDBIncrementalRestoreOperation yet, or how it is used 
(eg: for incremental checkpoint or something else). I did notice it is 
iterating over a temporary RocksDB instance and inserting into a "final” 
instance. These writes could be optimized in a similar manner. Alternatively, 
it could be possible to use the temporary instance's SSTables, ingest them, and 
prune data out with RocksDB's DeleteRange.

To get started with prototyping, I was thinking of taking a simple approach of 
making an interface for RocksDBWriteBatchWrapper and swapping the 
implementation for one that does SSTable generation and ingestion. I reckon 
that will be an easy way to sanity check whether it works at all.

I was also planning on writing some benchmarks in RocksDB to understand the 
difference for ingestion scenarios, as RocksDB itself is sparse on details 
about SSTable ingestion[4] and does not have benchmarking for ingestion.

Does all of that seem sound? I'll report back when I get time to work out that 
implementation and tests, likely during the coming weekend.


Joey

---

[0]: I don’t have any specific sources on this. At a high-level, some of the 
operations happening during writes include writing to the memtable before 
flushing to an SSTable and doing merging and/or compaction. In general, these 
will add write-amplification and overall overhead to bulk insertion. These can 
largely be avoided by giving RocksDB SSTables, especially if they have 
non-overlapping key-ranges.  "Characterizing, Modeling, and Benchmarking 
RocksDB Key-Value Workloads at Facebook" 
(https://www.usenix.org/system/files/fast20-cao_zhichao.pdf) is a helpful 
source that highlights what happens during various workloads.


[1]: CockroachDB is a database that uses RocksDB as the on-disk storage. Their 
implementation consolidates bulk ingestion to an AddSSTable command. Primarily, 
they have some choice of options for SSTable generation and ingestion that are 
of interest:

* SSTable generation: 
https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L929-L966

* SSTable ingestion: 
https://github.com/cockroachdb/cockroach/blob/c9aeb373511283db21b83c3c5a776ec2da2da1ed/c-deps/libroach/db.cc#L842-L917


[2]: TiDB, and TiKV which is the KV layer of the database, uses RocksDB as the 
on-disk storage. Their implementation of bulk ingestion is contained within:  
https://github.com/tikv/tikv/blob/master/components/sst_importer/src/sst_importer.rs

Other useful references:
- https://github.com/tikv/tikv/issues/2404, discussing performance for copy vs. 
move options.


[3]: Percona is a SQL database which supports a RocksDB backend. Their 
implementation of ingestion can be found here: 
https://github.com/percona/percona-server/blob/a259dc92e76e1569bc5ed34993cc7fc6af43832a/storage/rocksdb/ha_rocksdb.cc#L2815


[4]: Again, there is not a lot of official resources on this. Notable 
references I found on this include:

* https://github.com/facebook/rocksdb/issues/2473, which describes at a 
high-level how re-insertions work.

* https://github.com/facebook/rocksdb/issues/3540, which describes the 
performance costs for ingesting overlapping SSTables, and specific benchmarks 
(post-fix) here: https://github.com/facebook/rocksdb/pull/3564

* https://github.com/facebook/rocksdb/pull/3179, which describes the mechanism 
for ingesting SSTable files: there need to be point-key overlap checks for the 
LSM.

* https://github.com/facebook/rocksdb/issues/5133, indicates re-ingesting the 
same SSTable (due to restarts in import processes), can cause issues for a 
particular set of options.

* https://github.com/facebook/rocksdb/issues/5770#issuecomment-528488245, 
indicates compaction occurs more (or, only) when overlapping SSTables are 
ingested. The thinking here is non-overlapping SSTable ingestion means very few 
operations (compaction, merging, etc) occur afterward, with the right tuning 
for generation and ingestion.

* https://github.com/facebook/rocksdb/issues/5010, which discusses some 
unresolved issues for high CPU overhead on ingestion.

Reply via email to