Github user sihuazhou commented on a diff in the pull request: https://github.com/apache/flink/pull/5582#discussion_r192335797 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java --- @@ -547,4 +549,30 @@ public boolean accept(File file, String s) { return true; } } + + private static class TestRocksDBStateBackend extends RocksDBStateBackend { + + public TestRocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) { + super(checkpointStreamBackend, enableIncrementalCheckpointing); + } + + @Override + public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend( + Environment env, + JobID jobID, + String operatorIdentifier, + TypeSerializer<K> keySerializer, + int numberOfKeyGroups, + KeyGroupRange keyGroupRange, + TaskKvStateRegistry kvStateRegistry) throws IOException { + + AbstractKeyedStateBackend<K> keyedStateBackend = super.createKeyedStateBackend( + env, jobID, operatorIdentifier, keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry); + + // We ignore the range deletions on production, but when we are running the tests we shouldn't ignore it. --- End diff -- I introduce this because we may called the `deleteRange()` when rescaling from incremental checkpoint. According to the RocksDB's comment on its CPP file, in order to get rid of the downside of the read performance, we should set `readOptions.setIgnoreRangeDeletions(true);`. - On production, that is fine because we won't query any record that belong to the key-group we have delete. - But when running tests, we may need to verify that after restoring from checkpoint, we didn't take any external key-group that isn't belong to the target key group into the backend(e.g. `StateBackendTestBase#testKeyGroupSnapshotRestore().`). The reason that we need to `readOptions.setIgnoreRangeDeletions(false);` in this case might be explain as below: ```java db.deleteRange(range1); readOptions.setIgnoreRangeDeletions(true); db.get(readOptions, key in range1); // this may not be null, because we have ignore the range deletions. readOptions.setIgnoreRangeDeletions(false); db.get(readOptions, key in range1); // this will be null ```
---