This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c6c54412bf70af4432071dfdf131b62ff8236fe7
Author: wangfeifan <zoltar9...@163.com>
AuthorDate: Mon Jul 24 15:39:51 2023 +0800

    [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared 
state of IncrementalRemoteKeyedStateHandle
    
    Before this pr, IncrementalRemoteKeyedStateHandle use local file name of
    shared states as SharedStateRegistryKey, which cause the handles under
    the same key are not equal (file may be re-upload when
    maxConcurrentCheckpoint > 1 or using no-claim mode when recovering from
    a retained checkpoint). The collision of registry key causes
    SharedStateRegistry to delete the state handle wrongly.
    
    Co-authored-by: Roman <khachatryan.ro...@gmail.com>
---
 .../metadata/MetadataV2V3SerializerBase.java       |  31 ++---
 .../runtime/state/IncrementalKeyedStateHandle.java |  69 ++++++++-
 .../state/IncrementalLocalKeyedStateHandle.java    |  16 +--
 .../state/IncrementalRemoteKeyedStateHandle.java   |  86 ++++++------
 .../state/PlaceholderStreamStateHandle.java        |   7 +-
 .../changelog/ChangelogStateBackendHandle.java     |  31 +++--
 .../checkpoint/CheckpointCoordinatorTest.java      | 101 ++++++++------
 .../checkpoint/metadata/CheckpointTestUtils.java   |  14 +-
 .../runtime/scheduler/SchedulerUtilsTest.java      |  34 +++--
 .../IncrementalRemoteKeyedStateHandleTest.java     | 155 +++++++++++++--------
 .../runtime/state/SharedStateRegistryTest.java     |   4 +-
 .../state/RocksDBKeyedStateBackendBuilder.java     |   8 +-
 .../streaming/state/RocksDBStateDownloader.java    |  24 ++--
 .../streaming/state/RocksDBStateUploader.java      |  64 ++++-----
 .../RocksDBIncrementalRestoreOperation.java        |   4 +-
 .../state/restore/RocksDBRestoreResult.java        |  11 +-
 .../snapshot/RocksDBSnapshotStrategyBase.java      |  38 +++--
 .../snapshot/RocksIncrementalSnapshotStrategy.java | 120 ++++++++--------
 .../snapshot/RocksNativeFullSnapshotStrategy.java  |  44 +++---
 .../state/snapshot/RocksSnapshotUtil.java          |  13 --
 .../state/EmbeddedRocksDBStateBackendTest.java     |  24 ++--
 .../state/RocksDBStateDownloaderTest.java          |  19 ++-
 .../streaming/state/RocksDBStateUploaderTest.java  |  36 ++---
 .../RocksIncrementalSnapshotStrategyTest.java      |  22 +--
 .../test/checkpointing/StateHandleReuseITCase.java |   6 +-
 25 files changed, 548 insertions(+), 433 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
index 37f22ee95ad..5f898359f96 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -341,8 +342,8 @@ public abstract class MetadataV2V3SerializerBase {
 
             
serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), 
dos);
 
-            
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), 
dos);
-            
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), 
dos);
+            
serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getSharedState(), 
dos);
+            
serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getPrivateState(), 
dos);
 
             writeStateHandleId(incrementalKeyedStateHandle, dos);
         } else if (stateHandle instanceof ChangelogStateBackendHandle) {
@@ -548,10 +549,8 @@ public abstract class MetadataV2V3SerializerBase {
                 KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 
1);
 
         StreamStateHandle metaDataStateHandle = 
deserializeStreamStateHandle(dis, context);
-        Map<StateHandleID, StreamStateHandle> sharedStates =
-                deserializeStreamStateHandleMap(dis, context);
-        Map<StateHandleID, StreamStateHandle> privateStates =
-                deserializeStreamStateHandleMap(dis, context);
+        List<HandleAndLocalPath> sharedStates = 
deserializeHandleAndLocalPathList(dis, context);
+        List<HandleAndLocalPath> privateStates = 
deserializeHandleAndLocalPathList(dis, context);
 
         UUID uuid;
 
@@ -804,26 +803,26 @@ public abstract class MetadataV2V3SerializerBase {
         return new StateObjectCollection<>(result);
     }
 
-    private static void serializeStreamStateHandleMap(
-            Map<StateHandleID, StreamStateHandle> map, DataOutputStream dos) 
throws IOException {
+    private static void serializeHandleAndLocalPathList(
+            List<HandleAndLocalPath> list, DataOutputStream dos) throws 
IOException {
 
-        dos.writeInt(map.size());
-        for (Map.Entry<StateHandleID, StreamStateHandle> entry : 
map.entrySet()) {
-            dos.writeUTF(entry.getKey().toString());
-            serializeStreamStateHandle(entry.getValue(), dos);
+        dos.writeInt(list.size());
+        for (HandleAndLocalPath handleAndLocalPath : list) {
+            dos.writeUTF(handleAndLocalPath.getLocalPath());
+            serializeStreamStateHandle(handleAndLocalPath.getHandle(), dos);
         }
     }
 
-    private static Map<StateHandleID, StreamStateHandle> 
deserializeStreamStateHandleMap(
+    private static List<HandleAndLocalPath> deserializeHandleAndLocalPathList(
             DataInputStream dis, @Nullable DeserializationContext context) 
throws IOException {
 
         final int size = dis.readInt();
-        Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
+        List<HandleAndLocalPath> result = new ArrayList<>(size);
 
         for (int i = 0; i < size; ++i) {
-            StateHandleID stateHandleID = new StateHandleID(dis.readUTF());
+            String localPath = dis.readUTF();
             StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, 
context);
-            result.put(stateHandleID, stateHandle);
+            result.add(HandleAndLocalPath.of(stateHandle, localPath));
         }
 
         return result;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 76a6cd296dd..9ce3cbd332f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -20,9 +20,13 @@ package org.apache.flink.runtime.state;
 
 import javax.annotation.Nonnull;
 
-import java.util.Map;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /** Common interface to all incremental {@link KeyedStateHandle}. */
 public interface IncrementalKeyedStateHandle
         extends KeyedStateHandle, CheckpointBoundKeyedStateHandle {
@@ -32,9 +36,66 @@ public interface IncrementalKeyedStateHandle
     UUID getBackendIdentifier();
 
     /**
-     * Returns a set of ids of all registered shared states in the backend at 
the time this was
-     * created.
+     * Returns a list of all shared states and the corresponding localPath in 
the backend at the
+     * time this was created.
      */
     @Nonnull
-    Map<StateHandleID, StreamStateHandle> getSharedStateHandles();
+    List<HandleAndLocalPath> getSharedStateHandles();
+
+    /** A Holder of StreamStateHandle and the corresponding localPath. */
+    final class HandleAndLocalPath implements Serializable {
+
+        private static final long serialVersionUID = 7711754687567545052L;
+
+        StreamStateHandle handle;
+        final String localPath;
+
+        public static HandleAndLocalPath of(StreamStateHandle handle, String 
localPath) {
+            checkNotNull(handle, "streamStateHandle cannot be null");
+            checkNotNull(localPath, "localPath cannot be null");
+            return new HandleAndLocalPath(handle, localPath);
+        }
+
+        private HandleAndLocalPath(StreamStateHandle handle, String localPath) 
{
+            this.handle = handle;
+            this.localPath = localPath;
+        }
+
+        public StreamStateHandle getHandle() {
+            return this.handle;
+        }
+
+        public String getLocalPath() {
+            return this.localPath;
+        }
+
+        public long getStateSize() {
+            return this.handle.getStateSize();
+        }
+
+        /** Replace the StreamStateHandle with the registry returned one. */
+        public void replaceHandle(StreamStateHandle registryReturned) {
+            checkNotNull(registryReturned);
+            this.handle = registryReturned;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+
+            if (!(o instanceof HandleAndLocalPath)) {
+                return false;
+            }
+
+            HandleAndLocalPath that = (HandleAndLocalPath) o;
+            return this.handle.equals(that.handle) && 
this.localPath.equals(that.localPath);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(handle, localPath);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
index 85405dc649e..d782cf886bd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
@@ -23,8 +23,8 @@ import org.apache.flink.util.ExceptionUtils;
 import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 
 /**
@@ -46,8 +46,8 @@ public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle
     /** Handle to Flink's state meta data. */
     @Nonnull private final StreamStateHandle metaDataState;
 
-    /** Set with the ids of all shared state handles created by the 
checkpoint. */
-    @Nonnull private final Map<StateHandleID, StreamStateHandle> 
sharedStateHandleIDs;
+    /** All shared state handles and the corresponding localPath used by the 
checkpoint. */
+    @Nonnull private final List<HandleAndLocalPath> sharedState;
 
     public IncrementalLocalKeyedStateHandle(
             @Nonnull UUID backendIdentifier,
@@ -55,13 +55,13 @@ public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle
             @Nonnull DirectoryStateHandle directoryStateHandle,
             @Nonnull KeyGroupRange keyGroupRange,
             @Nonnull StreamStateHandle metaDataState,
-            @Nonnull Map<StateHandleID, StreamStateHandle> 
sharedStateHandleIDs) {
+            @Nonnull List<HandleAndLocalPath> sharedState) {
 
         super(directoryStateHandle, keyGroupRange);
         this.backendIdentifier = backendIdentifier;
         this.checkpointId = checkpointId;
         this.metaDataState = metaDataState;
-        this.sharedStateHandleIDs = new HashMap<>(sharedStateHandleIDs);
+        this.sharedState = new ArrayList<>(sharedState);
     }
 
     @Nonnull
@@ -93,8 +93,8 @@ public class IncrementalLocalKeyedStateHandle extends 
DirectoryKeyedStateHandle
 
     @Override
     @Nonnull
-    public Map<StateHandleID, StreamStateHandle> getSharedStateHandles() {
-        return sharedStateHandleIDs;
+    public List<HandleAndLocalPath> getSharedStateHandles() {
+        return sharedState;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
index 28813e7bd97..9318259fe62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
@@ -26,8 +26,10 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
-import java.util.Map;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 /**
  * The handle to states of an incremental snapshot.
@@ -77,10 +79,10 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
     private final long checkpointId;
 
     /** Shared state in the incremental checkpoint. */
-    private final Map<StateHandleID, StreamStateHandle> sharedState;
+    private final List<HandleAndLocalPath> sharedState;
 
     /** Private state in the incremental checkpoint. */
-    private final Map<StateHandleID, StreamStateHandle> privateState;
+    private final List<HandleAndLocalPath> privateState;
 
     /** Primary meta data state of the incremental checkpoint. */
     private final StreamStateHandle metaStateHandle;
@@ -103,8 +105,8 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
             UUID backendIdentifier,
             KeyGroupRange keyGroupRange,
             long checkpointId,
-            Map<StateHandleID, StreamStateHandle> sharedState,
-            Map<StateHandleID, StreamStateHandle> privateState,
+            List<HandleAndLocalPath> sharedState,
+            List<HandleAndLocalPath> privateState,
             StreamStateHandle metaStateHandle) {
         this(
                 backendIdentifier,
@@ -121,8 +123,8 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
             UUID backendIdentifier,
             KeyGroupRange keyGroupRange,
             long checkpointId,
-            Map<StateHandleID, StreamStateHandle> sharedState,
-            Map<StateHandleID, StreamStateHandle> privateState,
+            List<HandleAndLocalPath> sharedState,
+            List<HandleAndLocalPath> privateState,
             StreamStateHandle metaStateHandle,
             long persistedSizeOfThisCheckpoint) {
 
@@ -141,8 +143,8 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
             UUID backendIdentifier,
             KeyGroupRange keyGroupRange,
             long checkpointId,
-            Map<StateHandleID, StreamStateHandle> sharedState,
-            Map<StateHandleID, StreamStateHandle> privateState,
+            List<HandleAndLocalPath> sharedState,
+            List<HandleAndLocalPath> privateState,
             StreamStateHandle metaStateHandle,
             long persistedSizeOfThisCheckpoint,
             StateHandleID stateHandleId) {
@@ -164,8 +166,8 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
             UUID backendIdentifier,
             KeyGroupRange keyGroupRange,
             long checkpointId,
-            Map<StateHandleID, StreamStateHandle> sharedState,
-            Map<StateHandleID, StreamStateHandle> privateState,
+            List<HandleAndLocalPath> sharedState,
+            List<HandleAndLocalPath> privateState,
             StreamStateHandle metaStateHandle,
             long persistedSizeOfThisCheckpoint,
             StateHandleID stateHandleId) {
@@ -203,11 +205,11 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
                 stateHandleId);
     }
 
-    public Map<StateHandleID, StreamStateHandle> getSharedState() {
+    public List<HandleAndLocalPath> getSharedState() {
         return sharedState;
     }
 
-    public Map<StateHandleID, StreamStateHandle> getPrivateState() {
+    public List<HandleAndLocalPath> getPrivateState() {
         return privateState;
     }
 
@@ -222,7 +224,7 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
 
     @Nonnull
     @Override
-    public Map<StateHandleID, StreamStateHandle> getSharedStateHandles() {
+    public List<HandleAndLocalPath> getSharedStateHandles() {
         return getSharedState();
     }
 
@@ -262,7 +264,10 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
         }
 
         try {
-            StateUtil.bestEffortDiscardAllStateObjects(privateState.values());
+            StateUtil.bestEffortDiscardAllStateObjects(
+                    privateState.stream()
+                            .map(HandleAndLocalPath::getHandle)
+                            .collect(Collectors.toList()));
         } catch (Exception e) {
             LOG.warn("Could not properly discard misc file states.", e);
         }
@@ -270,7 +275,10 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
         // discard only on TM; on JM, shared state is removed on subsumption
         if (!isRegistered) {
             try {
-                
StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+                StateUtil.bestEffortDiscardAllStateObjects(
+                        sharedState.stream()
+                                .map(HandleAndLocalPath::getHandle)
+                                .collect(Collectors.toList()));
             } catch (Exception e) {
                 LOG.warn("Could not properly discard new sst file states.", e);
             }
@@ -281,12 +289,12 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
     public long getStateSize() {
         long size = StateUtil.getStateSize(metaStateHandle);
 
-        for (StreamStateHandle sharedStateHandle : sharedState.values()) {
-            size += sharedStateHandle.getStateSize();
+        for (HandleAndLocalPath handleAndLocalPath : sharedState) {
+            size += handleAndLocalPath.getStateSize();
         }
 
-        for (StreamStateHandle privateStateHandle : privateState.values()) {
-            size += privateStateHandle.getStateSize();
+        for (HandleAndLocalPath handleAndLocalPath : privateState) {
+            size += handleAndLocalPath.getStateSize();
         }
 
         return size;
@@ -321,28 +329,18 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
                 checkpointId,
                 backendIdentifier);
 
-        for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle :
-                sharedState.entrySet()) {
-            SharedStateRegistryKey registryKey =
-                    
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
-
+        for (HandleAndLocalPath handleAndLocalPath : sharedState) {
             StreamStateHandle reference =
                     stateRegistry.registerReference(
-                            registryKey, sharedStateHandle.getValue(), 
checkpointID);
-
-            // This step consolidates our shared handles with the registry, 
which does two things:
-            //
-            // 1) Replace placeholder state handle with already registered, 
actual state handles.
-            //
-            // 2) Deduplicate re-uploads of incremental state due to missing 
confirmations about
-            // completed checkpoints.
-            //
-            // This prevents the following problem:
-            // A previous checkpoint n has already registered the state. This 
can happen if a
-            // following checkpoint (n + x) wants to reference the same state 
before the backend got
-            // notified that checkpoint n completed. In this case, the shared 
registry did
-            // deduplication and returns the previous reference.
-            sharedStateHandle.setValue(reference);
+                            
createSharedStateRegistryKey(handleAndLocalPath.getHandle()),
+                            handleAndLocalPath.getHandle(),
+                            checkpointID);
+
+            // This step consolidates our shared handles with the registry, 
which will replace
+            // placeholder state handle with already registered, actual state 
handles.
+            // Because of SharedStateRegistryKey is based on the physical id 
of the stream handle,
+            // no de-duplication will be performed. see FLINK-29913.
+            handleAndLocalPath.replaceHandle(reference);
         }
     }
 
@@ -359,11 +357,13 @@ public class IncrementalRemoteKeyedStateHandle implements 
IncrementalKeyedStateH
                 stateHandleId);
     }
 
-    /** Create a unique key to register one of our shared state handles. */
+    /** Create a unique key based on physical id to register one of our shared 
state handles. */
     @VisibleForTesting
-    public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+    public SharedStateRegistryKey 
createSharedStateRegistryKey(StreamStateHandle handle) {
+        String keyString = handle.getStreamStateHandleID().getKeyString();
+        // key strings tend to be longer, so we use the MD5 of the key string 
to save memory
         return new SharedStateRegistryKey(
-                String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
+                
UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString());
     }
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
index 4b42427de1c..a593ab0dbe2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -33,9 +33,11 @@ public class PlaceholderStreamStateHandle implements 
StreamStateHandle {
 
     private static final long serialVersionUID = 1L;
 
+    private final PhysicalStateHandleID physicalID;
     private final long stateSize;
 
-    public PlaceholderStreamStateHandle(long stateSize) {
+    public PlaceholderStreamStateHandle(PhysicalStateHandleID physicalID, long 
stateSize) {
+        this.physicalID = physicalID;
         this.stateSize = stateSize;
     }
 
@@ -53,8 +55,7 @@ public class PlaceholderStreamStateHandle implements 
StreamStateHandle {
 
     @Override
     public PhysicalStateHandleID getStreamStateHandleID() {
-        throw new UnsupportedOperationException(
-                "This is only a placeholder to be replaced by a real 
StreamStateHandle in the checkpoint coordinator.");
+        return physicalID;
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
index 6cf86ec65af..2d786c44d0d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.state.CheckpointBoundKeyedStateHandle;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle;
@@ -37,7 +38,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
@@ -175,18 +175,23 @@ public interface ChangelogStateBackendHandle
                 StreamStateHandle castMetaStateHandle =
                         restoreFileStateHandle(
                                 
incrementalRemoteKeyedStateHandle.getMetaStateHandle());
-                Map<StateHandleID, StreamStateHandle> castSharedStates =
-                        
incrementalRemoteKeyedStateHandle.getSharedState().entrySet().stream()
-                                .collect(
-                                        Collectors.toMap(
-                                                Map.Entry::getKey,
-                                                e -> 
restoreFileStateHandle(e.getValue())));
-                Map<StateHandleID, StreamStateHandle> castPrivateStates =
-                        
incrementalRemoteKeyedStateHandle.getPrivateState().entrySet().stream()
-                                .collect(
-                                        Collectors.toMap(
-                                                Map.Entry::getKey,
-                                                e -> 
restoreFileStateHandle(e.getValue())));
+                List<HandleAndLocalPath> castSharedStates =
+                        
incrementalRemoteKeyedStateHandle.getSharedState().stream()
+                                .map(
+                                        e ->
+                                                HandleAndLocalPath.of(
+                                                        
restoreFileStateHandle(e.getHandle()),
+                                                        e.getLocalPath()))
+                                .collect(Collectors.toList());
+
+                List<HandleAndLocalPath> castPrivateStates =
+                        
incrementalRemoteKeyedStateHandle.getPrivateState().stream()
+                                .map(
+                                        e ->
+                                                HandleAndLocalPath.of(
+                                                        
restoreFileStateHandle(e.getHandle()),
+                                                        e.getLocalPath()))
+                                .collect(Collectors.toList());
 
                 return IncrementalRemoteKeyedStateHandle.restore(
                         
incrementalRemoteKeyedStateHandle.getBackendIdentifier(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 6749b9aa516..6fa95fb5380 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointStorageAccess;
 import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -58,7 +59,6 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestingStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
@@ -119,6 +119,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 
+import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED;
@@ -2907,11 +2908,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
             int sharedHandleCount = 0;
 
-            List<Map<StateHandleID, StreamStateHandle>> 
sharedHandlesByCheckpoint =
+            List<List<HandleAndLocalPath>> sharedHandlesByCheckpoint =
                     new ArrayList<>(numCheckpoints);
 
             for (int i = 0; i < numCheckpoints; ++i) {
-                sharedHandlesByCheckpoint.add(new HashMap<>(2));
+                sharedHandlesByCheckpoint.add(new ArrayList<>(2));
             }
 
             int cp = 0;
@@ -2929,20 +2930,23 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                             sharedHandlesByCheckpoint
                                     .get(cp)
-                                    
.putAll(incrementalKeyedStateHandle.getSharedState());
-
-                            for (StreamStateHandle streamStateHandle :
-                                    
incrementalKeyedStateHandle.getSharedState().values()) {
-                                assertTrue(
-                                        !(streamStateHandle
-                                                instanceof 
PlaceholderStreamStateHandle));
+                                    
.addAll(incrementalKeyedStateHandle.getSharedState());
+
+                            for (HandleAndLocalPath handleAndLocalPath :
+                                    
incrementalKeyedStateHandle.getSharedState()) {
+                                StreamStateHandle streamStateHandle =
+                                        handleAndLocalPath.getHandle();
+                                assertThat(
+                                                streamStateHandle
+                                                        instanceof 
PlaceholderStreamStateHandle)
+                                        .isFalse();
                                 verify(streamStateHandle, 
never()).discardState();
                                 ++sharedHandleCount;
                             }
 
-                            for (StreamStateHandle streamStateHandle :
-                                    
incrementalKeyedStateHandle.getPrivateState().values()) {
-                                verify(streamStateHandle, 
never()).discardState();
+                            for (HandleAndLocalPath handleAndLocalPath :
+                                    
incrementalKeyedStateHandle.getPrivateState()) {
+                                verify(handleAndLocalPath.getHandle(), 
never()).discardState();
                             }
 
                             
verify(incrementalKeyedStateHandle.getMetaStateHandle(), never())
@@ -2962,11 +2966,10 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
             store.removeOldestCheckpoint();
 
             // we expect no shared state was discarded because the state of 
CP0 is still referenced
-            // by
-            // CP1
-            for (Map<StateHandleID, StreamStateHandle> cpList : 
sharedHandlesByCheckpoint) {
-                for (StreamStateHandle streamStateHandle : cpList.values()) {
-                    verify(streamStateHandle, never()).discardState();
+            // by CP1
+            for (List<HandleAndLocalPath> cpList : sharedHandlesByCheckpoint) {
+                for (HandleAndLocalPath handleAndLocalPath : cpList) {
+                    verify(handleAndLocalPath.getHandle(), 
never()).discardState();
                 }
             }
 
@@ -3941,26 +3944,35 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
             KeyGroupRange keyGroupRange = keyGroupPartitions1.get(index);
 
-            Map<StateHandleID, StreamStateHandle> privateState = new 
HashMap<>();
-            privateState.put(
-                    new StateHandleID("private-1"),
-                    spy(new ByteStreamStateHandle("private-1", new byte[] 
{'p'})));
+            List<HandleAndLocalPath> privateState = new ArrayList<>();
+            privateState.add(
+                    HandleAndLocalPath.of(
+                            spy(new ByteStreamStateHandle("private-1", new 
byte[] {'p'})),
+                            "private-1"));
 
-            Map<StateHandleID, StreamStateHandle> sharedState = new 
HashMap<>();
+            List<HandleAndLocalPath> sharedState = new ArrayList<>();
 
             // let all but the first CP overlap by one shared state.
             if (cpSequenceNumber > 0) {
-                sharedState.put(
-                        new StateHandleID("shared-" + (cpSequenceNumber - 1)),
-                        spy(new PlaceholderStreamStateHandle(1L)));
+                sharedState.add(
+                        HandleAndLocalPath.of(
+                                spy(
+                                        new ByteStreamStateHandle(
+                                                "shared-"
+                                                        + (cpSequenceNumber - 
1)
+                                                        + "-"
+                                                        + keyGroupRange,
+                                                new byte[] {'s'})),
+                                "shared-" + (cpSequenceNumber - 1)));
             }
 
-            sharedState.put(
-                    new StateHandleID("shared-" + cpSequenceNumber),
-                    spy(
-                            new ByteStreamStateHandle(
-                                    "shared-" + cpSequenceNumber + "-" + 
keyGroupRange,
-                                    new byte[] {'s'})));
+            sharedState.add(
+                    HandleAndLocalPath.of(
+                            spy(
+                                    new ByteStreamStateHandle(
+                                            "shared-" + cpSequenceNumber + "-" 
+ keyGroupRange,
+                                            new byte[] {'s'})),
+                            "shared-" + cpSequenceNumber));
 
             IncrementalRemoteKeyedStateHandle managedState =
                     spy(
@@ -4096,15 +4108,15 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
     }
 
     private static void verifyDiscard(
-            List<Map<StateHandleID, StreamStateHandle>> 
sharedHandlesByCheckpoint1,
+            List<List<HandleAndLocalPath>> sharedHandles,
             Function<Integer, VerificationMode> checkpointVerify)
             throws Exception {
-        for (Map<StateHandleID, StreamStateHandle> cpList : 
sharedHandlesByCheckpoint1) {
-            for (Map.Entry<StateHandleID, StreamStateHandle> entry : 
cpList.entrySet()) {
-                String key = entry.getKey().getKeyString();
+        for (List<HandleAndLocalPath> cpList : sharedHandles) {
+            for (HandleAndLocalPath handleAndLocalPath : cpList) {
+                String key = handleAndLocalPath.getLocalPath();
                 int checkpointID = 
Integer.parseInt(String.valueOf(key.charAt(key.length() - 1)));
                 VerificationMode verificationMode = 
checkpointVerify.apply(checkpointID);
-                verify(entry.getValue(), verificationMode).discardState();
+                verify(handleAndLocalPath.getHandle(), 
verificationMode).discardState();
             }
         }
     }
@@ -4139,10 +4151,13 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
             TestingStreamStateHandle privateState,
             TestingStreamStateHandle sharedState)
             throws CheckpointException {
-        Map<StateHandleID, StreamStateHandle> sharedStateMap =
-                new HashMap<>(singletonMap(new 
StateHandleID("shared-state-key"), sharedState));
-        Map<StateHandleID, StreamStateHandle> privateStateMap =
-                new HashMap<>(singletonMap(new 
StateHandleID("private-state-key"), privateState));
+        List<HandleAndLocalPath> sharedStateList =
+                new ArrayList<>(
+                        singletonList(HandleAndLocalPath.of(sharedState, 
"shared-state-key")));
+        List<HandleAndLocalPath> privateStateList =
+                new ArrayList<>(
+                        singletonList(HandleAndLocalPath.of(privateState, 
"private-state-key")));
+
         ExecutionJobVertex jobVertex = graph.getJobVertex(ackVertexID);
         OperatorID operatorID = 
jobVertex.getOperatorIDs().get(0).getGeneratedOperatorID();
         coordinator.receiveAcknowledgeMessage(
@@ -4160,8 +4175,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                                                 
UUID.randomUUID(),
                                                                 
KeyGroupRange.of(0, 9),
                                                                 checkpointId,
-                                                                sharedStateMap,
-                                                                
privateStateMap,
+                                                                
sharedStateList,
+                                                                
privateStateList,
                                                                 metaState))
                                                 .build()))),
                 "test");
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
index 0a0cb1c5934..a124a6f6dbb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -256,18 +256,18 @@ public class CheckpointTestUtils {
                 createRandomUUID(rnd),
                 new KeyGroupRange(1, 1),
                 checkpointId,
-                createRandomStateHandleMap(rnd),
-                createRandomStateHandleMap(rnd),
+                createRandomHandleAndLocalPathList(rnd),
+                createRandomHandleAndLocalPathList(rnd),
                 createDummyStreamStateHandle(rnd, null));
     }
 
-    public static Map<StateHandleID, StreamStateHandle> 
createRandomStateHandleMap(Random rnd) {
+    public static List<HandleAndLocalPath> 
createRandomHandleAndLocalPathList(Random rnd) {
         final int size = rnd.nextInt(4);
-        Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
+        List<HandleAndLocalPath> result = new ArrayList<>(size);
         for (int i = 0; i < size; ++i) {
-            StateHandleID randomId = new 
StateHandleID(createRandomUUID(rnd).toString());
+            String localPath = createRandomUUID(rnd).toString();
             StreamStateHandle stateHandle = createDummyStreamStateHandle(rnd, 
null);
-            result.put(randomId, stateHandle);
+            result.add(HandleAndLocalPath.of(stateHandle, localPath));
         }
 
         return result;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 9e21dd42e08..70421734f65 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -33,13 +33,13 @@ import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.RestoreMode;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
@@ -48,14 +48,12 @@ import org.apache.flink.util.concurrent.Executors;
 
 import org.junit.Test;
 
-import java.util.HashMap;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Executor;
 
 import static java.util.Collections.emptyList;
-import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
@@ -94,11 +92,11 @@ public class SchedulerUtilsTest extends TestLogger {
     @Test
     public void testSharedStateRegistration() throws Exception {
         UUID backendId = UUID.randomUUID();
-        StateHandleID key = new StateHandleID("k0");
+        String localPath = "k0";
         StreamStateHandle handle = new ByteStreamStateHandle("h0", new byte[] 
{1, 2, 3});
         CheckpointRecoveryFactory recoveryFactory =
                 buildRecoveryFactory(
-                        buildCheckpoint(buildIncrementalHandle(key, handle, 
backendId)));
+                        buildCheckpoint(buildIncrementalHandle(localPath, 
handle, backendId)));
 
         CompletedCheckpointStore checkpointStore =
                 SchedulerUtils.createCompletedCheckpointStore(
@@ -112,10 +110,20 @@ public class SchedulerUtilsTest extends TestLogger {
         SharedStateRegistry sharedStateRegistry = 
checkpointStore.getSharedStateRegistry();
 
         IncrementalRemoteKeyedStateHandle newHandle =
-                buildIncrementalHandle(key, new 
PlaceholderStreamStateHandle(1L), backendId);
+                buildIncrementalHandle(
+                        localPath,
+                        new PlaceholderStreamStateHandle(
+                                handle.getStreamStateHandleID(), 
handle.getStateSize()),
+                        backendId);
         newHandle.registerSharedStates(sharedStateRegistry, 1L);
 
-        assertSame(handle, newHandle.getSharedState().get(key));
+        assertSame(
+                handle,
+                newHandle.getSharedState().stream()
+                        .filter(e -> e.getLocalPath().equals(localPath))
+                        .findFirst()
+                        .get()
+                        .getHandle());
     }
 
     private CheckpointRecoveryFactory buildRecoveryFactory(CompletedCheckpoint 
checkpoint) {
@@ -160,16 +168,16 @@ public class SchedulerUtilsTest extends TestLogger {
     }
 
     private IncrementalRemoteKeyedStateHandle buildIncrementalHandle(
-            StateHandleID key, StreamStateHandle shared, UUID 
backendIdentifier) {
+            String localPath, StreamStateHandle shared, UUID 
backendIdentifier) {
         StreamStateHandle meta = new ByteStreamStateHandle("meta", new byte[] 
{1, 2, 3});
-        Map<StateHandleID, StreamStateHandle> sharedStateMap = new HashMap<>();
-        sharedStateMap.put(key, shared);
+        List<HandleAndLocalPath> sharedState = new ArrayList<>(1);
+        sharedState.add(HandleAndLocalPath.of(shared, localPath));
         return new IncrementalRemoteKeyedStateHandle(
                 backendIdentifier,
                 KeyGroupRange.EMPTY_KEY_GROUP_RANGE,
                 1,
-                sharedStateMap,
-                emptyMap(),
+                sharedState,
+                emptyList(),
                 meta);
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
index b1128786e1e..b73e59f7dca 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
@@ -19,14 +19,18 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Map;
+import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -48,12 +52,12 @@ public class IncrementalRemoteKeyedStateHandleTest {
 
         stateHandle.discardState();
 
-        for (StreamStateHandle handle : 
stateHandle.getPrivateState().values()) {
-            verify(handle).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle.getPrivateState()) {
+            verify(handleAndLocalPath.getHandle()).discardState();
         }
 
-        for (StreamStateHandle handle : stateHandle.getSharedState().values()) 
{
-            verify(handle).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle.getSharedState()) {
+            verify(handleAndLocalPath.getHandle()).discardState();
         }
 
         verify(stateHandle.getMetaStateHandle()).discardState();
@@ -73,13 +77,11 @@ public class IncrementalRemoteKeyedStateHandleTest {
         IncrementalRemoteKeyedStateHandle stateHandle2 = create(new 
Random(42));
 
         // Both handles should not be registered and not discarded by now.
-        for (Map.Entry<StateHandleID, StreamStateHandle> entry :
-                stateHandle1.getSharedState().entrySet()) {
-            verify(entry.getValue(), times(0)).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle1.getSharedState()) {
+            verify(handleAndLocalPath.getHandle(), times(0)).discardState();
         }
-        for (Map.Entry<StateHandleID, StreamStateHandle> entry :
-                stateHandle2.getSharedState().entrySet()) {
-            verify(entry.getValue(), times(0)).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle2.getSharedState()) {
+            verify(handleAndLocalPath.getHandle(), times(0)).discardState();
         }
 
         // Now we register both ...
@@ -87,48 +89,40 @@ public class IncrementalRemoteKeyedStateHandleTest {
         registry.checkpointCompleted(0L);
         stateHandle2.registerSharedStates(registry, 0L);
 
-        for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry :
-                stateHandle1.getSharedState().entrySet()) {
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle1.getSharedState()) {
+            StreamStateHandle handle = handleAndLocalPath.getHandle();
 
-            SharedStateRegistryKey registryKey =
-                    stateHandle1.createSharedStateRegistryKeyFromFileName(
-                            stateHandleEntry.getKey());
+            SharedStateRegistryKey registryKey = 
stateHandle1.createSharedStateRegistryKey(handle);
 
-            verify(registry).registerReference(registryKey, 
stateHandleEntry.getValue(), 0L);
+            verify(registry).registerReference(registryKey, handle, 0L);
         }
 
-        for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry :
-                stateHandle2.getSharedState().entrySet()) {
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle2.getSharedState()) {
+            StreamStateHandle handle = handleAndLocalPath.getHandle();
 
-            SharedStateRegistryKey registryKey =
-                    stateHandle1.createSharedStateRegistryKeyFromFileName(
-                            stateHandleEntry.getKey());
+            SharedStateRegistryKey registryKey = 
stateHandle2.createSharedStateRegistryKey(handle);
 
-            verify(registry).registerReference(registryKey, 
stateHandleEntry.getValue(), 0L);
+            verify(registry).registerReference(registryKey, handle, 0L);
         }
 
         // We discard the first
         stateHandle1.discardState();
 
         // Should be unregistered, non-shared discarded, shared not discarded
-        for (Map.Entry<StateHandleID, StreamStateHandle> entry :
-                stateHandle1.getSharedState().entrySet()) {
-            verify(entry.getValue(), times(0)).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle1.getSharedState()) {
+            verify(handleAndLocalPath.getHandle(), times(0)).discardState();
         }
 
-        for (StreamStateHandle handle : 
stateHandle2.getSharedState().values()) {
-
-            verify(handle, times(0)).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle2.getSharedState()) {
+            verify(handleAndLocalPath.getHandle(), times(0)).discardState();
         }
 
-        for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry :
-                stateHandle1.getPrivateState().entrySet()) {
-            verify(handleEntry.getValue(), times(1)).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle1.getPrivateState()) {
+            verify(handleAndLocalPath.getHandle(), times(1)).discardState();
         }
 
-        for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry :
-                stateHandle2.getPrivateState().entrySet()) {
-            verify(handleEntry.getValue(), times(0)).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle2.getPrivateState()) {
+            verify(handleAndLocalPath.getHandle(), times(0)).discardState();
         }
 
         verify(stateHandle1.getMetaStateHandle(), times(1)).discardState();
@@ -139,14 +133,12 @@ public class IncrementalRemoteKeyedStateHandleTest {
 
         // Now everything should be unregistered and discarded
         registry.unregisterUnusedState(Long.MAX_VALUE);
-        for (Map.Entry<StateHandleID, StreamStateHandle> entry :
-                stateHandle1.getSharedState().entrySet()) {
-            verify(entry.getValue()).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle1.getSharedState()) {
+            verify(handleAndLocalPath.getHandle()).discardState();
         }
 
-        for (Map.Entry<StateHandleID, StreamStateHandle> entry :
-                stateHandle2.getSharedState().entrySet()) {
-            verify(entry.getValue()).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandle2.getSharedState()) {
+            verify(handleAndLocalPath.getHandle()).discardState();
         }
 
         verify(stateHandle1.getMetaStateHandle(), times(1)).discardState();
@@ -209,14 +201,14 @@ public class IncrementalRemoteKeyedStateHandleTest {
         // Should be completely discarded because it is tracked through the 
new registry
         sharedStateRegistryB.unregisterUnusedState(1L);
 
-        for (StreamStateHandle stateHandle : 
stateHandleX.getSharedState().values()) {
-            verify(stateHandle, times(1)).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandleX.getSharedState()) {
+            verify(handleAndLocalPath.getHandle(), times(1)).discardState();
         }
-        for (StreamStateHandle stateHandle : 
stateHandleY.getSharedState().values()) {
-            verify(stateHandle, never()).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandleY.getSharedState()) {
+            verify(handleAndLocalPath.getHandle(), never()).discardState();
         }
-        for (StreamStateHandle stateHandle : 
stateHandleZ.getSharedState().values()) {
-            verify(stateHandle, never()).discardState();
+        for (HandleAndLocalPath handleAndLocalPath : 
stateHandleZ.getSharedState()) {
+            verify(handleAndLocalPath.getHandle(), never()).discardState();
         }
         sharedStateRegistryB.close();
     }
@@ -242,13 +234,63 @@ public class IncrementalRemoteKeyedStateHandleTest {
         assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId());
     }
 
+    @Test
+    public void testConcurrentCheckpointSharedStateRegistration() throws 
Exception {
+        String localPath = "1.sst";
+        StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", 
new byte[] {'s'});
+        StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", 
new byte[] {'s'});
+
+        SharedStateRegistry registry = new SharedStateRegistryImpl();
+
+        UUID backendID = UUID.randomUUID();
+
+        IncrementalRemoteKeyedStateHandle handle1 =
+                new IncrementalRemoteKeyedStateHandle(
+                        backendID,
+                        KeyGroupRange.of(0, 0),
+                        1L,
+                        placeSpies(
+                                Collections.singletonList(
+                                        HandleAndLocalPath.of(streamHandle1, 
localPath))),
+                        Collections.emptyList(),
+                        new ByteStreamStateHandle("", new byte[] {'s'}));
+
+        handle1.registerSharedStates(registry, handle1.getCheckpointId());
+
+        IncrementalRemoteKeyedStateHandle handle2 =
+                new IncrementalRemoteKeyedStateHandle(
+                        backendID,
+                        KeyGroupRange.of(0, 0),
+                        2L,
+                        placeSpies(
+                                Collections.singletonList(
+                                        HandleAndLocalPath.of(streamHandle2, 
localPath))),
+                        Collections.emptyList(),
+                        new ByteStreamStateHandle("", new byte[] {'s'}));
+
+        handle2.registerSharedStates(registry, handle2.getCheckpointId());
+
+        registry.checkpointCompleted(1L);
+
+        // checkpoint 2 failed
+        handle2.discardState();
+
+        for (HandleAndLocalPath handleAndLocalPath : handle1.getSharedState()) 
{
+            verify(handleAndLocalPath.getHandle(), never()).discardState();
+        }
+        for (HandleAndLocalPath handleAndLocalPath : handle2.getSharedState()) 
{
+            verify(handleAndLocalPath.getHandle(), never()).discardState();
+        }
+        registry.close();
+    }
+
     private static IncrementalRemoteKeyedStateHandle create(Random rnd) {
         return new IncrementalRemoteKeyedStateHandle(
                 UUID.nameUUIDFromBytes("test".getBytes()),
                 KeyGroupRange.of(0, 0),
                 1L,
-                
placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
-                
placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
+                
placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(rnd)),
+                
placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(rnd)),
                 spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd, 
null)));
     }
 
@@ -257,18 +299,15 @@ public class IncrementalRemoteKeyedStateHandleTest {
                 UUID.nameUUIDFromBytes("test".getBytes()),
                 KeyGroupRange.of(0, 0),
                 1L,
-                
placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
-                
placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
+                
placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(rnd)),
+                
placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(rnd)),
                 spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd, 
null)),
                 checkpointedSize);
     }
 
-    private static Map<StateHandleID, StreamStateHandle> placeSpies(
-            Map<StateHandleID, StreamStateHandle> map) {
-
-        for (Map.Entry<StateHandleID, StreamStateHandle> entry : 
map.entrySet()) {
-            entry.setValue(spy(entry.getValue()));
-        }
-        return map;
+    private static List<HandleAndLocalPath> 
placeSpies(List<HandleAndLocalPath> list) {
+        return list.stream()
+                .map(e -> HandleAndLocalPath.of(spy(e.getHandle()), 
e.getLocalPath()))
+                .collect(Collectors.toList());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 625b3a8ea64..959d7b34243 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -290,8 +290,8 @@ public class SharedStateRegistryTest {
                         UUID.randomUUID(),
                         KeyGroupRange.EMPTY_KEY_GROUP_RANGE,
                         1L,
-                        Collections.emptyMap(),
-                        Collections.emptyMap(),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
                         new ByteStreamStateHandle("meta", new byte[1]),
                         1024L,
                         new StateHandleID(stateId));
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
index 3df43d7224e..6b3eaee04ea 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java
@@ -38,14 +38,13 @@ import 
org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
 import org.apache.flink.runtime.state.BackendBuildingException;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.PriorityQueueSetFactory;
 import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
 import 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
 import org.apache.flink.runtime.state.heap.InternalKeyContext;
@@ -306,8 +305,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
         try {
             // Variables for snapshot strategy when incremental checkpoint is 
enabled
             UUID backendUID = UUID.randomUUID();
-            SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
materializedSstFiles =
-                    new TreeMap<>();
+            SortedMap<Long, Collection<HandleAndLocalPath>> 
materializedSstFiles = new TreeMap<>();
             long lastCompletedCheckpointId = -1L;
             if (injectedTestDB != null) {
                 db = injectedTestDB;
@@ -525,7 +523,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends 
AbstractKeyedStateBacken
             int keyGroupPrefixBytes,
             RocksDB db,
             UUID backendUID,
-            SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
materializedSstFiles,
+            SortedMap<Long, Collection<HandleAndLocalPath>> 
materializedSstFiles,
             long lastCompletedCheckpointId) {
         RocksDBSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy;
         RocksDBStateUploader stateUploader =
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
index d06790ced8a..6137ed4112e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java
@@ -19,6 +19,7 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -33,7 +34,6 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
@@ -56,9 +56,8 @@ public class RocksDBStateDownloader extends 
RocksDBStateDataTransfer {
             CloseableRegistry closeableRegistry)
             throws Exception {
 
-        final Map<StateHandleID, StreamStateHandle> sstFiles = 
restoreStateHandle.getSharedState();
-        final Map<StateHandleID, StreamStateHandle> miscFiles =
-                restoreStateHandle.getPrivateState();
+        final List<HandleAndLocalPath> sstFiles = 
restoreStateHandle.getSharedState();
+        final List<HandleAndLocalPath> miscFiles = 
restoreStateHandle.getPrivateState();
 
         downloadDataForAllStateHandles(sstFiles, dest, closeableRegistry);
         downloadDataForAllStateHandles(miscFiles, dest, closeableRegistry);
@@ -69,14 +68,15 @@ public class RocksDBStateDownloader extends 
RocksDBStateDataTransfer {
      * files w.r.t. their {@link StateHandleID}.
      */
     private void downloadDataForAllStateHandles(
-            Map<StateHandleID, StreamStateHandle> stateHandleMap,
+            List<HandleAndLocalPath> handleWithPaths,
             Path restoreInstancePath,
             CloseableRegistry closeableRegistry)
             throws Exception {
 
         try {
             List<Runnable> runnables =
-                    createDownloadRunnables(stateHandleMap, 
restoreInstancePath, closeableRegistry);
+                    createDownloadRunnables(
+                            handleWithPaths, restoreInstancePath, 
closeableRegistry);
             List<CompletableFuture<Void>> futures = new 
ArrayList<>(runnables.size());
             for (Runnable runnable : runnables) {
                 futures.add(CompletableFuture.runAsync(runnable, 
executorService));
@@ -94,15 +94,15 @@ public class RocksDBStateDownloader extends 
RocksDBStateDataTransfer {
     }
 
     private List<Runnable> createDownloadRunnables(
-            Map<StateHandleID, StreamStateHandle> stateHandleMap,
+            List<HandleAndLocalPath> handleWithPaths,
             Path restoreInstancePath,
             CloseableRegistry closeableRegistry) {
-        List<Runnable> runnables = new ArrayList<>(stateHandleMap.size());
-        for (Map.Entry<StateHandleID, StreamStateHandle> entry : 
stateHandleMap.entrySet()) {
-            StateHandleID stateHandleID = entry.getKey();
-            StreamStateHandle remoteFileHandle = entry.getValue();
+        List<Runnable> runnables = new ArrayList<>(handleWithPaths.size());
+        for (HandleAndLocalPath handleAndLocalPath : handleWithPaths) {
 
-            Path path = restoreInstancePath.resolve(stateHandleID.toString());
+            StreamStateHandle remoteFileHandle = 
handleAndLocalPath.getHandle();
+
+            Path path = 
restoreInstancePath.resolve(handleAndLocalPath.getLocalPath());
 
             runnables.add(
                     ThrowingRunnable.unchecked(
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
index 02270e82d7d..983450fad75 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java
@@ -22,7 +22,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.StateHandleID;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.ExceptionUtils;
@@ -37,11 +37,11 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /** Help class for uploading RocksDB state files. */
 public class RocksDBStateUploader extends RocksDBStateDataTransfer {
@@ -59,17 +59,15 @@ public class RocksDBStateUploader extends 
RocksDBStateDataTransfer {
      * @param stateScope
      * @throws Exception Thrown if can not upload all the files.
      */
-    public Map<StateHandleID, StreamStateHandle> uploadFilesToCheckpointFs(
-            @Nonnull Map<StateHandleID, Path> files,
+    public List<HandleAndLocalPath> uploadFilesToCheckpointFs(
+            @Nonnull List<Path> files,
             CheckpointStreamFactory checkpointStreamFactory,
             CheckpointedStateScope stateScope,
             CloseableRegistry closeableRegistry,
             CloseableRegistry tmpResourcesRegistry)
             throws Exception {
 
-        Map<StateHandleID, StreamStateHandle> handles = new HashMap<>();
-
-        Map<StateHandleID, CompletableFuture<StreamStateHandle>> futures =
+        List<CompletableFuture<HandleAndLocalPath>> futures =
                 createUploadFutures(
                         files,
                         checkpointStreamFactory,
@@ -77,12 +75,13 @@ public class RocksDBStateUploader extends 
RocksDBStateDataTransfer {
                         closeableRegistry,
                         tmpResourcesRegistry);
 
+        List<HandleAndLocalPath> handles = new ArrayList<>(files.size());
+
         try {
-            FutureUtils.waitForAll(futures.values()).get();
+            FutureUtils.waitForAll(futures).get();
 
-            for (Map.Entry<StateHandleID, 
CompletableFuture<StreamStateHandle>> entry :
-                    futures.entrySet()) {
-                handles.put(entry.getKey(), entry.getValue().get());
+            for (CompletableFuture<HandleAndLocalPath> future : futures) {
+                handles.add(future.get());
             }
         } catch (ExecutionException e) {
             Throwable throwable = ExceptionUtils.stripExecutionException(e);
@@ -97,32 +96,29 @@ public class RocksDBStateUploader extends 
RocksDBStateDataTransfer {
         return handles;
     }
 
-    private Map<StateHandleID, CompletableFuture<StreamStateHandle>> 
createUploadFutures(
-            Map<StateHandleID, Path> files,
+    private List<CompletableFuture<HandleAndLocalPath>> createUploadFutures(
+            List<Path> files,
             CheckpointStreamFactory checkpointStreamFactory,
             CheckpointedStateScope stateScope,
             CloseableRegistry closeableRegistry,
             CloseableRegistry tmpResourcesRegistry) {
-        Map<StateHandleID, CompletableFuture<StreamStateHandle>> futures =
-                new HashMap<>(files.size());
-
-        for (Map.Entry<StateHandleID, Path> entry : files.entrySet()) {
-            final Supplier<StreamStateHandle> supplier =
-                    CheckedSupplier.unchecked(
-                            () ->
-                                    uploadLocalFileToCheckpointFs(
-                                            entry.getValue(),
-                                            checkpointStreamFactory,
-                                            stateScope,
-                                            closeableRegistry,
-                                            tmpResourcesRegistry));
-            futures.put(entry.getKey(), 
CompletableFuture.supplyAsync(supplier, executorService));
-        }
-
-        return futures;
+        return files.stream()
+                .map(
+                        e ->
+                                CompletableFuture.supplyAsync(
+                                        CheckedSupplier.unchecked(
+                                                () ->
+                                                        
uploadLocalFileToCheckpointFs(
+                                                                e,
+                                                                
checkpointStreamFactory,
+                                                                stateScope,
+                                                                
closeableRegistry,
+                                                                
tmpResourcesRegistry)),
+                                        executorService))
+                .collect(Collectors.toList());
     }
 
-    private StreamStateHandle uploadLocalFileToCheckpointFs(
+    private HandleAndLocalPath uploadLocalFileToCheckpointFs(
             Path filePath,
             CheckpointStreamFactory checkpointStreamFactory,
             CheckpointedStateScope stateScope,
@@ -161,7 +157,7 @@ public class RocksDBStateUploader extends 
RocksDBStateDataTransfer {
             }
             tmpResourcesRegistry.registerCloseable(
                     () -> StateUtil.discardStateObjectQuietly(result));
-            return result;
+            return HandleAndLocalPath.of(result, 
filePath.getFileName().toString());
 
         } finally {
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
index d6ec9ae6055..5a89403617e 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java
@@ -36,13 +36,13 @@ import 
org.apache.flink.runtime.state.BackendBuildingException;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
 import org.apache.flink.runtime.state.DirectoryStateHandle;
 import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateSerializerProvider;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
@@ -86,7 +86,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements 
RocksDBRestoreOper
             LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class);
 
     private final String operatorIdentifier;
-    private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
restoredSstFiles;
+    private final SortedMap<Long, Collection<HandleAndLocalPath>> 
restoredSstFiles;
     private final RocksDBHandle rocksHandle;
     private final Collection<KeyedStateHandle> restoreStateHandles;
     private final CloseableRegistry cancelStreamRegistry;
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
index 87f124fe6d0..ad17b6f2769 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java
@@ -19,13 +19,12 @@
 package org.apache.flink.contrib.streaming.state.restore;
 
 import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor;
-import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 
 import org.rocksdb.ColumnFamilyHandle;
 import org.rocksdb.RocksDB;
 
-import java.util.Map;
+import java.util.Collection;
 import java.util.SortedMap;
 import java.util.UUID;
 
@@ -38,7 +37,7 @@ public class RocksDBRestoreResult {
     // fields only for incremental restore
     private final long lastCompletedCheckpointId;
     private final UUID backendUID;
-    private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
restoredSstFiles;
+    private final SortedMap<Long, Collection<HandleAndLocalPath>> 
restoredSstFiles;
 
     public RocksDBRestoreResult(
             RocksDB db,
@@ -46,7 +45,7 @@ public class RocksDBRestoreResult {
             RocksDBNativeMetricMonitor nativeMetricMonitor,
             long lastCompletedCheckpointId,
             UUID backendUID,
-            SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
restoredSstFiles) {
+            SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles) {
         this.db = db;
         this.defaultColumnFamilyHandle = defaultColumnFamilyHandle;
         this.nativeMetricMonitor = nativeMetricMonitor;
@@ -67,7 +66,7 @@ public class RocksDBRestoreResult {
         return backendUID;
     }
 
-    public SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
getRestoredSstFiles() {
+    public SortedMap<Long, Collection<HandleAndLocalPath>> 
getRestoredSstFiles() {
         return restoredSstFiles;
     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
index 6db43289cad..9cb73d01dc9 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.DirectoryStateHandle;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
@@ -39,7 +40,6 @@ import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResources;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.SnapshotStrategy;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
@@ -61,12 +61,14 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 /**
  * Abstract base class for {@link SnapshotStrategy} implementations for 
RocksDB state backend.
@@ -334,7 +336,7 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
 
         protected Optional<KeyedStateHandle> getLocalSnapshot(
                 @Nullable StreamStateHandle localStreamStateHandle,
-                Map<StateHandleID, StreamStateHandle> sharedStateHandleIDs)
+                List<HandleAndLocalPath> sharedState)
                 throws IOException {
             final DirectoryStateHandle directoryStateHandle =
                     localBackupDirectory.completeSnapshotAndGetHandle();
@@ -346,7 +348,7 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
                                 directoryStateHandle,
                                 keyGroupRange,
                                 localStreamStateHandle,
-                                sharedStateHandleIDs));
+                                sharedState));
             } else {
                 return Optional.empty();
             }
@@ -390,23 +392,33 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
     }
 
     protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-            new PreviousSnapshot(Collections.emptyMap());
+            new PreviousSnapshot(Collections.emptyList());
 
     /** Previous snapshot with uploaded sst files. */
     protected static class PreviousSnapshot {
 
-        @Nullable private final Map<StateHandleID, Long> confirmedSstFiles;
-
-        protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> 
confirmedSstFiles) {
-            this.confirmedSstFiles = confirmedSstFiles;
+        @Nonnull private final Map<String, StreamStateHandle> 
confirmedSstFiles;
+
+        protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> 
confirmedSstFiles) {
+            this.confirmedSstFiles =
+                    confirmedSstFiles != null
+                            ? confirmedSstFiles.stream()
+                                    .collect(
+                                            Collectors.toMap(
+                                                    
HandleAndLocalPath::getLocalPath,
+                                                    
HandleAndLocalPath::getHandle))
+                            : Collections.emptyMap();
         }
 
-        protected Optional<StreamStateHandle> getUploaded(StateHandleID 
stateHandleID) {
-            if (confirmedSstFiles != null && 
confirmedSstFiles.containsKey(stateHandleID)) {
-                // we introduce a placeholder state handle, that is replaced 
with the
-                // original from the shared state registry (created from a 
previous checkpoint)
+        protected Optional<StreamStateHandle> getUploaded(String filename) {
+            if (confirmedSstFiles.containsKey(filename)) {
+                StreamStateHandle handle = confirmedSstFiles.get(filename);
+                // We introduce a placeholder state handle to reduce network 
transfer overhead,
+                // it will be replaced by the original handle from the shared 
state registry
+                // (created from a previous checkpoint).
                 return Optional.of(
-                        new 
PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID)));
+                        new PlaceholderStreamStateHandle(
+                                handle.getStreamStateHandleID(), 
handle.getStateSize()));
             } else {
                 // Don't use any uploaded but not confirmed handles because 
they might be deleted
                 // (by TM) if the previous checkpoint failed. See FLINK-25395
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index f93f41fbaa0..9eb66a5ee8c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -27,14 +27,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.SnapshotType;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResult;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
@@ -49,7 +48,9 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.nio.file.Path;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -57,10 +58,8 @@ import java.util.Optional;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
-import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.getUploadedStateSize;
 
 /**
  * Snapshot strategy for {@link 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}
@@ -78,11 +77,11 @@ public class RocksIncrementalSnapshotStrategy<K>
     private static final String DESCRIPTION = "Asynchronous incremental 
RocksDB snapshot";
 
     /**
-     * Stores the {@link StateHandleID IDs} of uploaded SST files that build 
the incremental
-     * history. Once the checkpoint is confirmed by JM, only the ID paired 
with {@link
-     * PlaceholderStreamStateHandle} can be sent.
+     * Stores the {@link StreamStateHandle} and corresponding local path of 
uploaded SST files that
+     * build the incremental history. Once the checkpoint is confirmed by JM, 
they can be reused for
+     * incremental checkpoint.
      */
-    @Nonnull private final SortedMap<Long, Map<StateHandleID, Long>> 
uploadedStateIDs;
+    @Nonnull private final SortedMap<Long, Collection<HandleAndLocalPath>> 
uploadedSstFiles;
 
     /** The identifier of the last completed checkpoint. */
     private long lastCompletedCheckpointId;
@@ -101,7 +100,7 @@ public class RocksIncrementalSnapshotStrategy<K>
             @Nonnull CloseableRegistry cancelStreamRegistry,
             @Nonnull File instanceBasePath,
             @Nonnull UUID backendUID,
-            @Nonnull SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
uploadedStateHandles,
+            @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> 
uploadedStateHandles,
             @Nonnull RocksDBStateUploader rocksDBStateUploader,
             long lastCompletedCheckpointId) {
 
@@ -116,16 +115,8 @@ public class RocksIncrementalSnapshotStrategy<K>
                 localRecoveryConfig,
                 instanceBasePath,
                 backendUID);
-        this.uploadedStateIDs = new TreeMap<>();
-        for (Map.Entry<Long, Map<StateHandleID, StreamStateHandle>> entry :
-                uploadedStateHandles.entrySet()) {
-            Map<StateHandleID, Long> map = new HashMap<>();
-            for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry :
-                    entry.getValue().entrySet()) {
-                map.put(stateHandleEntry.getKey(), 
stateHandleEntry.getValue().getStateSize());
-            }
-            this.uploadedStateIDs.put(entry.getKey(), map);
-        }
+
+        this.uploadedSstFiles = new TreeMap<>(uploadedStateHandles);
         this.stateUploader = rocksDBStateUploader;
         this.lastCompletedCheckpointId = lastCompletedCheckpointId;
     }
@@ -174,13 +165,13 @@ public class RocksIncrementalSnapshotStrategy<K>
 
     @Override
     public void notifyCheckpointComplete(long completedCheckpointId) {
-        synchronized (uploadedStateIDs) {
+        synchronized (uploadedSstFiles) {
             // FLINK-23949: 
materializedSstFiles.keySet().contains(completedCheckpointId) make sure
             // the notified checkpointId is not a savepoint, otherwise next 
checkpoint will
             // degenerate into a full checkpoint
             if (completedCheckpointId > lastCompletedCheckpointId
-                    && uploadedStateIDs.containsKey(completedCheckpointId)) {
-                uploadedStateIDs
+                    && uploadedSstFiles.containsKey(completedCheckpointId)) {
+                uploadedSstFiles
                         .keySet()
                         .removeIf(checkpointId -> checkpointId < 
completedCheckpointId);
                 lastCompletedCheckpointId = completedCheckpointId;
@@ -190,8 +181,8 @@ public class RocksIncrementalSnapshotStrategy<K>
 
     @Override
     public void notifyCheckpointAborted(long abortedCheckpointId) {
-        synchronized (uploadedStateIDs) {
-            uploadedStateIDs.keySet().remove(abortedCheckpointId);
+        synchronized (uploadedSstFiles) {
+            uploadedSstFiles.keySet().remove(abortedCheckpointId);
         }
     }
 
@@ -205,13 +196,12 @@ public class RocksIncrementalSnapshotStrategy<K>
             long checkpointId, @Nonnull List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots) {
 
         final long lastCompletedCheckpoint;
-        final Map<StateHandleID, Long> confirmedSstFiles;
-        final Map<StateHandleID, StreamStateHandle> uploadedSstFiles;
+        final Collection<HandleAndLocalPath> confirmedSstFiles;
 
         // use the last completed checkpoint as the comparison base.
-        synchronized (uploadedStateIDs) {
+        synchronized (uploadedSstFiles) {
             lastCompletedCheckpoint = lastCompletedCheckpointId;
-            confirmedSstFiles = uploadedStateIDs.get(lastCompletedCheckpoint);
+            confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
             LOG.trace(
                     "Use confirmed SST files for checkpoint {}: {}",
                     checkpointId,
@@ -268,9 +258,9 @@ public class RocksIncrementalSnapshotStrategy<K>
             // Handle to the meta data file
             SnapshotResult<StreamStateHandle> metaStateHandle = null;
             // Handles to new sst files since the last completed checkpoint 
will go here
-            final Map<StateHandleID, StreamStateHandle> sstFiles = new 
HashMap<>();
+            final List<HandleAndLocalPath> sstFiles = new ArrayList<>();
             // Handles to the misc files in the current snapshot will go here
-            final Map<StateHandleID, StreamStateHandle> miscFiles = new 
HashMap<>();
+            final List<HandleAndLocalPath> miscFiles = new ArrayList<>();
 
             try {
 
@@ -288,11 +278,13 @@ public class RocksIncrementalSnapshotStrategy<K>
                         metaStateHandle.getJobManagerOwnedSnapshot(),
                         "Metadata for job manager was not properly created.");
 
-                uploadSstFiles(
-                        sstFiles, miscFiles, snapshotCloseableRegistry, 
tmpResourcesRegistry);
                 long checkpointedSize = metaStateHandle.getStateSize();
-                checkpointedSize += getUploadedStateSize(sstFiles.values());
-                checkpointedSize += getUploadedStateSize(miscFiles.values());
+                checkpointedSize +=
+                        uploadSnapshotFiles(
+                                sstFiles,
+                                miscFiles,
+                                snapshotCloseableRegistry,
+                                tmpResourcesRegistry);
 
                 // We make the 'sstFiles' as the 'sharedState' in 
IncrementalRemoteKeyedStateHandle,
                 // whether they belong to the sharded CheckpointedStateScope 
or exclusive
@@ -334,9 +326,10 @@ public class RocksIncrementalSnapshotStrategy<K>
             }
         }
 
-        private void uploadSstFiles(
-                @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles,
-                @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles,
+        /** upload files and return total uploaded size. */
+        private long uploadSnapshotFiles(
+                @Nonnull List<HandleAndLocalPath> sstFiles,
+                @Nonnull List<HandleAndLocalPath> miscFiles,
                 @Nonnull CloseableRegistry snapshotCloseableRegistry,
                 @Nonnull CloseableRegistry tmpResourcesRegistry)
                 throws Exception {
@@ -344,43 +337,47 @@ public class RocksIncrementalSnapshotStrategy<K>
             // write state data
             Preconditions.checkState(localBackupDirectory.exists());
 
-            Map<StateHandleID, Path> sstFilePaths = new HashMap<>();
-            Map<StateHandleID, Path> miscFilePaths = new HashMap<>();
-
             Path[] files = localBackupDirectory.listDirectory();
+            long uploadedSize = 0;
             if (files != null) {
+                List<Path> sstFilePaths = new ArrayList<>(files.length);
+                List<Path> miscFilePaths = new ArrayList<>(files.length);
+
                 createUploadFilePaths(files, sstFiles, sstFilePaths, 
miscFilePaths);
 
                 final CheckpointedStateScope stateScope =
                         sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING
                                 ? CheckpointedStateScope.EXCLUSIVE
                                 : CheckpointedStateScope.SHARED;
-                sstFiles.putAll(
+
+                List<HandleAndLocalPath> sstFilesUploadResult =
                         stateUploader.uploadFilesToCheckpointFs(
                                 sstFilePaths,
                                 checkpointStreamFactory,
                                 stateScope,
                                 snapshotCloseableRegistry,
-                                tmpResourcesRegistry));
-                miscFiles.putAll(
+                                tmpResourcesRegistry);
+                uploadedSize +=
+                        sstFilesUploadResult.stream().mapToLong(e -> 
e.getStateSize()).sum();
+                sstFiles.addAll(sstFilesUploadResult);
+
+                List<HandleAndLocalPath> miscFilesUploadResult =
                         stateUploader.uploadFilesToCheckpointFs(
                                 miscFilePaths,
                                 checkpointStreamFactory,
                                 stateScope,
                                 snapshotCloseableRegistry,
-                                tmpResourcesRegistry));
+                                tmpResourcesRegistry);
+                uploadedSize +=
+                        miscFilesUploadResult.stream().mapToLong(e -> 
e.getStateSize()).sum();
+                miscFiles.addAll(miscFilesUploadResult);
 
-                synchronized (uploadedStateIDs) {
+                synchronized (uploadedSstFiles) {
                     switch (sharingFilesStrategy) {
                         case FORWARD_BACKWARD:
                         case FORWARD:
-                            uploadedStateIDs.put(
-                                    checkpointId,
-                                    sstFiles.entrySet().stream()
-                                            .collect(
-                                                    Collectors.toMap(
-                                                            Map.Entry::getKey,
-                                                            t -> 
t.getValue().getStateSize())));
+                            uploadedSstFiles.put(
+                                    checkpointId, 
Collections.unmodifiableList(sstFiles));
                             break;
                         case NO_SHARING:
                             break;
@@ -392,27 +389,26 @@ public class RocksIncrementalSnapshotStrategy<K>
                     }
                 }
             }
+            return uploadedSize;
         }
 
         private void createUploadFilePaths(
                 Path[] files,
-                Map<StateHandleID, StreamStateHandle> sstFiles,
-                Map<StateHandleID, Path> sstFilePaths,
-                Map<StateHandleID, Path> miscFilePaths) {
+                List<HandleAndLocalPath> sstFiles,
+                List<Path> sstFilePaths,
+                List<Path> miscFilePaths) {
             for (Path filePath : files) {
                 final String fileName = filePath.getFileName().toString();
-                final StateHandleID stateHandleID = new 
StateHandleID(fileName);
 
                 if (fileName.endsWith(SST_FILE_SUFFIX)) {
-                    Optional<StreamStateHandle> uploaded =
-                            previousSnapshot.getUploaded(stateHandleID);
+                    Optional<StreamStateHandle> uploaded = 
previousSnapshot.getUploaded(fileName);
                     if (uploaded.isPresent()) {
-                        sstFiles.put(stateHandleID, uploaded.get());
+                        sstFiles.add(HandleAndLocalPath.of(uploaded.get(), 
fileName));
                     } else {
-                        sstFilePaths.put(stateHandleID, filePath); // re-upload
+                        sstFilePaths.add(filePath); // re-upload
                     }
                 } else {
-                    miscFilePaths.put(stateHandleID, filePath);
+                    miscFilePaths.add(filePath);
                 }
             }
         }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java
index a6d4f843d7d..3fc45862dc8 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java
@@ -26,13 +26,13 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.LocalRecoveryConfig;
 import org.apache.flink.runtime.state.SnapshotDirectory;
 import org.apache.flink.runtime.state.SnapshotResult;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.Preconditions;
@@ -45,16 +45,15 @@ import javax.annotation.Nonnull;
 
 import java.io.File;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 
-import static 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.getUploadedStateSize;
-
 /**
  * Snapshot strategy for {@link RocksDBKeyedStateBackend} based on RocksDB's 
native checkpoints and
  * creates full snapshots. the difference between savepoint is that sst files 
will be uploaded
@@ -165,7 +164,7 @@ public class RocksNativeFullSnapshotStrategy<K>
             // Handle to the meta data file
             SnapshotResult<StreamStateHandle> metaStateHandle = null;
             // Handles to all the files in the current snapshot will go here
-            final Map<StateHandleID, StreamStateHandle> privateFiles = new 
HashMap<>();
+            final List<HandleAndLocalPath> privateFiles = new ArrayList<>();
 
             try {
 
@@ -183,23 +182,25 @@ public class RocksNativeFullSnapshotStrategy<K>
                         metaStateHandle.getJobManagerOwnedSnapshot(),
                         "Metadata for job manager was not properly created.");
 
-                uploadSstFiles(privateFiles, snapshotCloseableRegistry, 
tmpResourcesRegistry);
                 long checkpointedSize = metaStateHandle.getStateSize();
-                checkpointedSize += 
getUploadedStateSize(privateFiles.values());
+
+                checkpointedSize +=
+                        uploadSnapshotFiles(
+                                privateFiles, snapshotCloseableRegistry, 
tmpResourcesRegistry);
 
                 final IncrementalRemoteKeyedStateHandle 
jmIncrementalKeyedStateHandle =
                         new IncrementalRemoteKeyedStateHandle(
                                 backendUID,
                                 keyGroupRange,
                                 checkpointId,
-                                Collections.emptyMap(),
+                                Collections.emptyList(),
                                 privateFiles,
                                 metaStateHandle.getJobManagerOwnedSnapshot(),
                                 checkpointedSize);
 
                 Optional<KeyedStateHandle> localSnapshot =
                         getLocalSnapshot(
-                                metaStateHandle.getTaskLocalSnapshot(), 
Collections.emptyMap());
+                                metaStateHandle.getTaskLocalSnapshot(), 
Collections.emptyList());
                 final SnapshotResult<KeyedStateHandle> snapshotResult =
                         localSnapshot
                                 .map(
@@ -219,8 +220,9 @@ public class RocksNativeFullSnapshotStrategy<K>
             }
         }
 
-        private void uploadSstFiles(
-                @Nonnull Map<StateHandleID, StreamStateHandle> privateFiles,
+        /** upload files and return total uploaded size. */
+        private long uploadSnapshotFiles(
+                @Nonnull List<HandleAndLocalPath> privateFiles,
                 @Nonnull CloseableRegistry snapshotCloseableRegistry,
                 @Nonnull CloseableRegistry tmpResourcesRegistry)
                 throws Exception {
@@ -228,25 +230,23 @@ public class RocksNativeFullSnapshotStrategy<K>
             // write state data
             Preconditions.checkState(localBackupDirectory.exists());
 
-            Map<StateHandleID, Path> privateFilePaths = new HashMap<>();
-
             Path[] files = localBackupDirectory.listDirectory();
+            long uploadedSize = 0;
             if (files != null) {
                 // all sst files are private in full snapshot
-                for (Path filePath : files) {
-                    final String fileName = filePath.getFileName().toString();
-                    final StateHandleID stateHandleID = new 
StateHandleID(fileName);
-                    privateFilePaths.put(stateHandleID, filePath);
-                }
-
-                privateFiles.putAll(
+                List<HandleAndLocalPath> uploadedFiles =
                         stateUploader.uploadFilesToCheckpointFs(
-                                privateFilePaths,
+                                Arrays.asList(files),
                                 checkpointStreamFactory,
                                 CheckpointedStateScope.EXCLUSIVE,
                                 snapshotCloseableRegistry,
-                                tmpResourcesRegistry));
+                                tmpResourcesRegistry);
+
+                uploadedSize += uploadedFiles.stream().mapToLong(e -> 
e.getStateSize()).sum();
+
+                privateFiles.addAll(uploadedFiles);
             }
+            return uploadedSize;
         }
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
index 10425708b4f..cf0692c6777 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.contrib.streaming.state.snapshot;
 
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
-import org.apache.flink.runtime.state.StateObject;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
-import java.util.Collection;
-
 /**
  * Utility methods and constants around RocksDB creating and restoring 
snapshots for {@link
  * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}.
@@ -36,11 +30,4 @@ public class RocksSnapshotUtil {
     private RocksSnapshotUtil() {
         throw new AssertionError();
     }
-
-    public static long getUploadedStateSize(Collection<StreamStateHandle> 
streamStateHandles) {
-        return streamStateHandles.stream()
-                .filter(s -> !(s instanceof PlaceholderStreamStateHandle))
-                .mapToLong(StateObject::getStateSize)
-                .sum();
-    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
index 4d6b533d662..642ff87e248 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java
@@ -33,14 +33,13 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
 import org.apache.flink.runtime.state.ConfigurableStateBackend;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryImpl;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackendTestBase;
-import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
@@ -78,12 +77,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
 
 import static junit.framework.TestCase.assertNotNull;
 import static org.junit.Assert.assertEquals;
@@ -582,17 +580,23 @@ public class EmbeddedRocksDBStateBackendTest
                             (IncrementalRemoteKeyedStateHandle)
                                     
snapshotResult.getJobManagerOwnedSnapshot();
 
-                    Map<StateHandleID, StreamStateHandle> sharedState =
-                            new HashMap<>(stateHandle.getSharedState());
+                    // create new HandleAndLocalPath object for keeping handle 
before replacement
+                    List<HandleAndLocalPath> sharedState =
+                            stateHandle.getSharedState().stream()
+                                    .map(
+                                            e ->
+                                                    HandleAndLocalPath.of(
+                                                            e.getHandle(), 
e.getLocalPath()))
+                                    .collect(Collectors.toList());
 
                     stateHandle.registerSharedStates(sharedStateRegistry, 
checkpointId);
 
-                    for (Map.Entry<StateHandleID, StreamStateHandle> e : 
sharedState.entrySet()) {
+                    for (HandleAndLocalPath handleAndLocalPath : sharedState) {
                         verify(sharedStateRegistry)
                                 .registerReference(
-                                        
stateHandle.createSharedStateRegistryKeyFromFileName(
-                                                e.getKey()),
-                                        e.getValue(),
+                                        
stateHandle.createSharedStateRegistryKey(
+                                                
handleAndLocalPath.getHandle()),
+                                        handleAndLocalPath.getHandle(),
                                         checkpointId);
                     }
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
index fcce8674887..e19ee84d63f 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
@@ -20,9 +20,9 @@ package org.apache.flink.contrib.streaming.state;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestStreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -37,9 +37,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.UUID;
@@ -59,8 +57,8 @@ public class RocksDBStateDownloaderTest extends TestLogger {
                 new SpecifiedException("throw exception while multi thread 
restore.");
         StreamStateHandle stateHandle = new 
ThrowingStateHandle(expectedException);
 
-        Map<StateHandleID, StreamStateHandle> stateHandles = new HashMap<>(1);
-        stateHandles.put(new StateHandleID("state1"), stateHandle);
+        List<HandleAndLocalPath> stateHandles = new ArrayList<>(1);
+        stateHandles.add(HandleAndLocalPath.of(stateHandle, "state1"));
 
         IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle =
                 new IncrementalRemoteKeyedStateHandle(
@@ -98,12 +96,13 @@ public class RocksDBStateDownloaderTest extends TestLogger {
             handles.add(new ByteStreamStateHandle(String.format("state%d", i), 
contents[i]));
         }
 
-        Map<StateHandleID, StreamStateHandle> sharedStates = new 
HashMap<>(contentNum);
-        Map<StateHandleID, StreamStateHandle> privateStates = new 
HashMap<>(contentNum);
+        List<HandleAndLocalPath> sharedStates = new ArrayList<>(contentNum);
+        List<HandleAndLocalPath> privateStates = new ArrayList<>(contentNum);
         for (int i = 0; i < contentNum; ++i) {
-            sharedStates.put(new StateHandleID(String.format("sharedState%d", 
i)), handles.get(i));
-            privateStates.put(
-                    new StateHandleID(String.format("privateState%d", i)), 
handles.get(i));
+            sharedStates.add(
+                    HandleAndLocalPath.of(handles.get(i), 
String.format("sharedState%d", i)));
+            privateStates.add(
+                    HandleAndLocalPath.of(handles.get(i), 
String.format("privateState%d", i)));
         }
 
         IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle =
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
index 7af62c83705..60e0fee9d79 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.StateHandleID;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -42,10 +42,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -90,8 +89,8 @@ public class RocksDBStateUploaderTest extends TestLogger {
         File file = TempDirUtils.newFile(temporaryFolder, 
String.valueOf(UUID.randomUUID()));
         generateRandomFileContent(file.getPath(), 20);
 
-        Map<StateHandleID, Path> filePaths = new HashMap<>(1);
-        filePaths.put(new StateHandleID("mockHandleID"), file.toPath());
+        List<Path> filePaths = new ArrayList<>(1);
+        filePaths.add(file.toPath());
         try (RocksDBStateUploader rocksDBStateUploader = new 
RocksDBStateUploader(5)) {
             assertThatThrownBy(
                             () ->
@@ -134,7 +133,7 @@ public class RocksDBStateUploaderTest extends TestLogger {
         String localFolder = "local";
         TempDirUtils.newFolder(temporaryFolder, localFolder);
 
-        Map<StateHandleID, Path> filePaths =
+        List<Path> filePaths =
                 generateRandomSstFiles(localFolder, sstFileCount, 
fileStateSizeThreshold);
         CloseableRegistry tmpResourcesRegistry = new CloseableRegistry();
         try (RocksDBStateUploader rocksDBStateUploader = new 
RocksDBStateUploader(1)) {
@@ -165,12 +164,11 @@ public class RocksDBStateUploaderTest extends TestLogger {
             assertThat(checkpointPrivateFolder.list()).isEmpty();
             assertThat(checkpointSharedFolder.list()).isEmpty();
 
-            Map.Entry<StateHandleID, Path> first = 
filePaths.entrySet().stream().findFirst().get();
+            Path first = filePaths.stream().findFirst().get();
             assertThatThrownBy(
                             () ->
                                     
rocksDBStateUploader.uploadFilesToCheckpointFs(
-                                            Collections.singletonMap(
-                                                    first.getKey(), 
first.getValue()),
+                                            Collections.singletonList(first),
                                             checkpointStreamFactory,
                                             CheckpointedStateScope.SHARED,
                                             new CloseableRegistry(),
@@ -210,11 +208,11 @@ public class RocksDBStateUploaderTest extends TestLogger {
         TempDirUtils.newFolder(temporaryFolder, localFolder);
 
         int sstFileCount = 6;
-        Map<StateHandleID, Path> sstFilePaths =
+        List<Path> sstFilePaths =
                 generateRandomSstFiles(localFolder, sstFileCount, 
fileStateSizeThreshold);
 
         try (RocksDBStateUploader rocksDBStateUploader = new 
RocksDBStateUploader(5)) {
-            Map<StateHandleID, StreamStateHandle> sstFiles =
+            List<HandleAndLocalPath> sstFiles =
                     rocksDBStateUploader.uploadFilesToCheckpointFs(
                             sstFilePaths,
                             checkpointStreamFactory,
@@ -222,9 +220,15 @@ public class RocksDBStateUploaderTest extends TestLogger {
                             new CloseableRegistry(),
                             new CloseableRegistry());
 
-            for (Map.Entry<StateHandleID, Path> entry : 
sstFilePaths.entrySet()) {
+            for (Path path : sstFilePaths) {
                 assertStateContentEqual(
-                        entry.getValue(), 
sstFiles.get(entry.getKey()).openInputStream());
+                        path,
+                        sstFiles.stream()
+                                .filter(e -> 
e.getLocalPath().equals(path.getFileName().toString()))
+                                .findFirst()
+                                .get()
+                                .getHandle()
+                                .openInputStream());
             }
         }
     }
@@ -259,18 +263,18 @@ public class RocksDBStateUploaderTest extends TestLogger {
         };
     }
 
-    private Map<StateHandleID, Path> generateRandomSstFiles(
+    private List<Path> generateRandomSstFiles(
             String localFolder, int sstFileCount, int fileStateSizeThreshold) 
throws IOException {
         ThreadLocalRandom random = ThreadLocalRandom.current();
 
-        Map<StateHandleID, Path> sstFilePaths = new HashMap<>(sstFileCount);
+        List<Path> sstFilePaths = new ArrayList<>(sstFileCount);
         for (int i = 0; i < sstFileCount; ++i) {
             File file =
                     TempDirUtils.newFile(
                             temporaryFolder, String.format("%s/%d.sst", 
localFolder, i));
             generateRandomFileContent(
                     file.getPath(), random.nextInt(1_000_000) + 
fileStateSizeThreshold);
-            sstFilePaths.put(new StateHandleID(String.valueOf(i)), 
file.toPath());
+            sstFilePaths.add(file.toPath());
         }
         return sstFilePaths;
     }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
index 4e4a85aaf66..a3469a24306 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
@@ -28,12 +28,10 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.ArrayListSerializer;
 import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
+import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
-import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
 import org.apache.flink.util.ResourceGuard;
@@ -49,8 +47,8 @@ import org.rocksdb.RocksDBException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedHashMap;
-import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -89,16 +87,11 @@ public class RocksIncrementalSnapshotStrategyTest {
                             checkpointStreamFactory,
                             closeableRegistry);
 
-            // If 3rd checkpoint's placeholderStateHandleCount > 0,it means 
3rd checkpoint is
+            // If 3rd checkpoint's full size > checkpointed size, it means 3rd 
checkpoint is
             // incremental.
-            Map<StateHandleID, StreamStateHandle> sharedState3 =
-                    incrementalRemoteKeyedStateHandle3.getSharedState();
-            long placeholderStateHandleCount =
-                    sharedState3.entrySet().stream()
-                            .filter(e -> e.getValue() instanceof 
PlaceholderStreamStateHandle)
-                            .count();
-
-            Assert.assertTrue(placeholderStateHandleCount > 0);
+            Assert.assertTrue(
+                    incrementalRemoteKeyedStateHandle3.getStateSize()
+                            > 
incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
         }
     }
 
@@ -114,8 +107,7 @@ public class RocksIncrementalSnapshotStrategyTest {
         // construct RocksIncrementalSnapshotStrategy
         long lastCompletedCheckpointId = -1L;
         ResourceGuard rocksDBResourceGuard = new ResourceGuard();
-        SortedMap<Long, Map<StateHandleID, StreamStateHandle>> 
materializedSstFiles =
-                new TreeMap<>();
+        SortedMap<Long, Collection<HandleAndLocalPath>> materializedSstFiles = 
new TreeMap<>();
         LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> 
kvStateInformation =
                 new LinkedHashMap<>();
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateHandleReuseITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateHandleReuseITCase.java
index 0ce99b2f1a9..f81a7cb93a1 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateHandleReuseITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateHandleReuseITCase.java
@@ -38,7 +38,7 @@ import org.junit.Test;
 
 import java.util.Map;
 
-import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptyList;
 import static java.util.UUID.randomUUID;
 import static 
org.apache.flink.runtime.operators.lifecycle.command.TestCommand.FINISH_SOURCES;
 import static 
org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS;
@@ -89,8 +89,8 @@ public class StateHandleReuseITCase extends AbstractTestBase {
                         randomUUID(),
                         EMPTY_KEY_GROUP_RANGE,
                         1L,
-                        emptyMap(),
-                        emptyMap(),
+                        emptyList(),
+                        emptyList(),
                         new ByteStreamStateHandle("meta", new byte[] {0}),
                         0L);
 

Reply via email to