rkhachatryan commented on code in PR #22669:
URL: https://github.com/apache/flink/pull/22669#discussion_r1210462542


##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java:
##########
@@ -243,6 +244,60 @@ public void testNonEmptyIntersection() {
         assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId());
     }
 
+    @Test
+    public void testConcurrentCheckpointSharedStateRegistration() throws 
Exception {
+        StateHandleID handleID = new StateHandleID("1.sst");
+        StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", 
new byte[] {'s'});
+        StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", 
new byte[] {'s'});
+
+        SharedStateRegistry registry = new SharedStateRegistryImpl();
+
+        UUID backendID = UUID.randomUUID();
+
+        IncrementalRemoteKeyedStateHandle handle1 =
+                new IncrementalRemoteKeyedStateHandle(
+                        backendID,
+                        KeyGroupRange.of(0, 0),
+                        1L,
+                        placeSpies(
+                                new HashMap<StateHandleID, 
StreamStateHandle>() {
+                                    {
+                                        put(handleID, streamHandle1);
+                                    }
+                                }),

Review Comment:
   `Collections.singletonMap`?
   
   ditto: line 279



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -395,18 +394,16 @@ public void release() {
     /** Previous snapshot with uploaded sst files. */
     protected static class PreviousSnapshot {
 
-        @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
+        @Nullable private final Map<StateHandleID, StreamStateHandle> 
confirmedSstFiles;
 
-        protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> 
confirmedSstFiles) {
+        protected PreviousSnapshot(
+                @Nullable Map<StateHandleID, StreamStateHandle> 
confirmedSstFiles) {
             this.confirmedSstFiles = confirmedSstFiles;
         }
 
         protected Optional<StreamStateHandle> getUploaded(StateHandleID 
stateHandleID) {
             if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-                // we introduce a placeholder state handle, that is replaced 
with the
-                // original from the shared state registry (created from a 
previous checkpoint)
-                return Optional.of(
-                        new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   Could you elaborate why you had to delete `PlaceholderStreamStateHandle`?
   
   I'm concerned that with this change, `ByteStreamStateHandle` will always be 
sent always to the JM (regardless of whether they were previously sent or not).



##########
flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java:
##########
@@ -243,6 +244,60 @@ public void testNonEmptyIntersection() {
         assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId());
     }
 
+    @Test
+    public void testConcurrentCheckpointSharedStateRegistration() throws 
Exception {
+        StateHandleID handleID = new StateHandleID("1.sst");
+        StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", 
new byte[] {'s'});
+        StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", 
new byte[] {'s'});
+
+        SharedStateRegistry registry = new SharedStateRegistryImpl();
+
+        UUID backendID = UUID.randomUUID();
+
+        IncrementalRemoteKeyedStateHandle handle1 =
+                new IncrementalRemoteKeyedStateHandle(
+                        backendID,
+                        KeyGroupRange.of(0, 0),
+                        1L,
+                        placeSpies(
+                                new HashMap<StateHandleID, 
StreamStateHandle>() {
+                                    {
+                                        put(handleID, streamHandle1);
+                                    }
+                                }),
+                        Collections.emptyMap(),
+                        new ByteStreamStateHandle("", new byte[] {'s'}));
+
+        handle1.registerSharedStates(registry, handle1.getCheckpointId());
+
+        IncrementalRemoteKeyedStateHandle handle2 =
+                new IncrementalRemoteKeyedStateHandle(
+                        backendID,
+                        KeyGroupRange.of(0, 0),
+                        2L,
+                        placeSpies(
+                                new HashMap<StateHandleID, 
StreamStateHandle>() {
+                                    {
+                                        put(handleID, streamHandle2);
+                                    }
+                                }),
+                        Collections.emptyMap(),
+                        new ByteStreamStateHandle("", new byte[] {'s'}));
+
+        handle2.registerSharedStates(registry, handle2.getCheckpointId());
+
+        registry.checkpointCompleted(1L);
+        registry.checkpointCompleted(2L);

Review Comment:
   Shouldn't this 2nd checkpoint be aborted? Otherwise, why would any state be 
discarded?



##########
flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java:
##########
@@ -395,18 +394,16 @@ public void release() {
     /** Previous snapshot with uploaded sst files. */
     protected static class PreviousSnapshot {
 
-        @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
+        @Nullable private final Map<StateHandleID, StreamStateHandle> 
confirmedSstFiles;
 
-        protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> 
confirmedSstFiles) {
+        protected PreviousSnapshot(
+                @Nullable Map<StateHandleID, StreamStateHandle> 
confirmedSstFiles) {
             this.confirmedSstFiles = confirmedSstFiles;
         }
 
         protected Optional<StreamStateHandle> getUploaded(StateHandleID 
stateHandleID) {
             if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-                // we introduce a placeholder state handle, that is replaced 
with the
-                // original from the shared state registry (created from a 
previous checkpoint)
-                return Optional.of(
-                        new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));

Review Comment:
   (otherwise, the branching code here can be simplified with 
`Optional.ofNullable`)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##########
@@ -361,9 +362,10 @@ IncrementalRemoteKeyedStateHandle copy() {
 
     /** Create a unique key to register one of our shared state handles. */
     @VisibleForTesting
-    public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+    public SharedStateRegistryKey 
createSharedStateRegistryKey(StreamStateHandle handle) {
+        String keyString = handle.getStreamStateHandleID().getKeyString();
         return new SharedStateRegistryKey(
-                String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
+                
UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString());

Review Comment:
   This won't work when multiple handles point to the same file (but use 
different offsets), right?
   Should we switch to using completely random IDs (not related to remote nor 
local file paths)?
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java:
##########
@@ -324,7 +325,7 @@ public void registerSharedStates(SharedStateRegistry 
stateRegistry, long checkpo
         for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle :
                 sharedState.entrySet()) {
             SharedStateRegistryKey registryKey =
-                    
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
+                    createSharedStateRegistryKey(sharedStateHandle.getValue());

Review Comment:
   With this change, the key in `IncrementalRemoteKeyedStateHandle.sharedState` 
is not actually a key anymore, but just a local file name used on recovery.
   
   So keeping it as is seems confusing to me.
   
   I think it would be clearer if we'd keep that property that 
SharedStateRegistry key is the same key as in this map. That would require 
storing the local file path explicitly (which is good), for example by adding a 
holder class for `StreamStateHandle` and the local path.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to