rkhachatryan commented on code in PR #22669: URL: https://github.com/apache/flink/pull/22669#discussion_r1210462542
########## flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java: ########## @@ -243,6 +244,60 @@ public void testNonEmptyIntersection() { assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId()); } + @Test + public void testConcurrentCheckpointSharedStateRegistration() throws Exception { + StateHandleID handleID = new StateHandleID("1.sst"); + StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", new byte[] {'s'}); + StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", new byte[] {'s'}); + + SharedStateRegistry registry = new SharedStateRegistryImpl(); + + UUID backendID = UUID.randomUUID(); + + IncrementalRemoteKeyedStateHandle handle1 = + new IncrementalRemoteKeyedStateHandle( + backendID, + KeyGroupRange.of(0, 0), + 1L, + placeSpies( + new HashMap<StateHandleID, StreamStateHandle>() { + { + put(handleID, streamHandle1); + } + }), Review Comment: `Collections.singletonMap`? ditto: line 279 ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ########## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { - @Nullable private final Map<StateHandleID, Long> confirmedSstFiles; + @Nullable private final Map<StateHandleID, StreamStateHandle> confirmedSstFiles; - protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) { + protected PreviousSnapshot( + @Nullable Map<StateHandleID, StreamStateHandle> confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional<StreamStateHandle> getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - return Optional.of( - new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: Could you elaborate why you had to delete `PlaceholderStreamStateHandle`? I'm concerned that with this change, `ByteStreamStateHandle` will always be sent always to the JM (regardless of whether they were previously sent or not). ########## flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java: ########## @@ -243,6 +244,60 @@ public void testNonEmptyIntersection() { assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId()); } + @Test + public void testConcurrentCheckpointSharedStateRegistration() throws Exception { + StateHandleID handleID = new StateHandleID("1.sst"); + StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", new byte[] {'s'}); + StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", new byte[] {'s'}); + + SharedStateRegistry registry = new SharedStateRegistryImpl(); + + UUID backendID = UUID.randomUUID(); + + IncrementalRemoteKeyedStateHandle handle1 = + new IncrementalRemoteKeyedStateHandle( + backendID, + KeyGroupRange.of(0, 0), + 1L, + placeSpies( + new HashMap<StateHandleID, StreamStateHandle>() { + { + put(handleID, streamHandle1); + } + }), + Collections.emptyMap(), + new ByteStreamStateHandle("", new byte[] {'s'})); + + handle1.registerSharedStates(registry, handle1.getCheckpointId()); + + IncrementalRemoteKeyedStateHandle handle2 = + new IncrementalRemoteKeyedStateHandle( + backendID, + KeyGroupRange.of(0, 0), + 2L, + placeSpies( + new HashMap<StateHandleID, StreamStateHandle>() { + { + put(handleID, streamHandle2); + } + }), + Collections.emptyMap(), + new ByteStreamStateHandle("", new byte[] {'s'})); + + handle2.registerSharedStates(registry, handle2.getCheckpointId()); + + registry.checkpointCompleted(1L); + registry.checkpointCompleted(2L); Review Comment: Shouldn't this 2nd checkpoint be aborted? Otherwise, why would any state be discarded? ########## flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java: ########## @@ -395,18 +394,16 @@ public void release() { /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { - @Nullable private final Map<StateHandleID, Long> confirmedSstFiles; + @Nullable private final Map<StateHandleID, StreamStateHandle> confirmedSstFiles; - protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) { + protected PreviousSnapshot( + @Nullable Map<StateHandleID, StreamStateHandle> confirmedSstFiles) { this.confirmedSstFiles = confirmedSstFiles; } protected Optional<StreamStateHandle> getUploaded(StateHandleID stateHandleID) { if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - return Optional.of( - new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); Review Comment: (otherwise, the branching code here can be simplified with `Optional.ofNullable`) ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ########## @@ -361,9 +362,10 @@ IncrementalRemoteKeyedStateHandle copy() { /** Create a unique key to register one of our shared state handles. */ @VisibleForTesting - public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { + public SharedStateRegistryKey createSharedStateRegistryKey(StreamStateHandle handle) { + String keyString = handle.getStreamStateHandleID().getKeyString(); return new SharedStateRegistryKey( - String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); + UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString()); Review Comment: This won't work when multiple handles point to the same file (but use different offsets), right? Should we switch to using completely random IDs (not related to remote nor local file paths)? ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java: ########## @@ -324,7 +325,7 @@ public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpo for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) { SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); + createSharedStateRegistryKey(sharedStateHandle.getValue()); Review Comment: With this change, the key in `IncrementalRemoteKeyedStateHandle.sharedState` is not actually a key anymore, but just a local file name used on recovery. So keeping it as is seems confusing to me. I think it would be clearer if we'd keep that property that SharedStateRegistry key is the same key as in this map. That would require storing the local file path explicitly (which is good), for example by adding a holder class for `StreamStateHandle` and the local path. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org