Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5239#discussion_r169895777 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -2123,9 +2162,21 @@ void takeSnapshot() throws Exception { checkpointId, sstFiles, miscFiles, - metaStateHandle); + metaStateHandle.getJobManagerOwnedSnapshot()); + + DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle(); + StreamStateHandle taskLocalSnapshotMetaDataStateHandle = metaStateHandle.getTaskLocalSnapshot(); + IncrementalLocalKeyedStateHandle directoryKeyedStateHandle = + directoryStateHandle != null && taskLocalSnapshotMetaDataStateHandle != null ? + new IncrementalLocalKeyedStateHandle( + stateBackend.backendUID, + checkpointId, + directoryStateHandle, + stateBackend.keyGroupRange, + taskLocalSnapshotMetaDataStateHandle) : + null; - return new SnapshotResult<>(incrementalKeyedStateHandle, null); + return new SnapshotResult<>(incrementalKeyedStateHandle, directoryKeyedStateHandle); --- End diff -- Exactly ð
---