mayuehappy commented on a change in pull request #16171:
URL: https://github.com/apache/flink/pull/16171#discussion_r654567336
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
##########
@@ -142,6 +142,39 @@ public RocksIncrementalSnapshotStrategy(
this.localDirectoryName = backendUID.toString().replaceAll("[\\-]",
"");
}
+ public RocksIncrementalSnapshotStrategy(
Review comment:
I also think that the construction of `RocksIncrementalSnapshotStrategy`
and `uploader` is a little too tightly coupled, the current implementation is
not so flexible. I try it separately.
##########
File path:
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
##########
@@ -498,20 +505,36 @@ private RocksDBRestoreOperation
getRocksDBRestoreOperation(
RocksDBSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy;
if (enableIncrementalCheckpointing) {
checkpointSnapshotStrategy =
- new RocksIncrementalSnapshotStrategy<>(
- db,
- rocksDBResourceGuard,
- keySerializerProvider.currentSchemaSerializer(),
- kvStateInformation,
- keyGroupRange,
- keyGroupPrefixBytes,
- localRecoveryConfig,
- cancelStreamRegistry,
- instanceBasePath,
- backendUID,
- materializedSstFiles,
- lastCompletedCheckpointId,
- numberOfTransferingThreads);
+ injectRocksDBStateUploader != null
+ ? new RocksIncrementalSnapshotStrategy<>(
Review comment:
I think your ideas are more rigorous. please wait for my fix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]