This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch release-1.16 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c6c54412bf70af4432071dfdf131b62ff8236fe7 Author: wangfeifan <zoltar9...@163.com> AuthorDate: Mon Jul 24 15:39:51 2023 +0800 [FLINK-29913][checkpointing] Use PhysicalStateHandleID as a key for shared state of IncrementalRemoteKeyedStateHandle Before this pr, IncrementalRemoteKeyedStateHandle use local file name of shared states as SharedStateRegistryKey, which cause the handles under the same key are not equal (file may be re-upload when maxConcurrentCheckpoint > 1 or using no-claim mode when recovering from a retained checkpoint). The collision of registry key causes SharedStateRegistry to delete the state handle wrongly. Co-authored-by: Roman <khachatryan.ro...@gmail.com> --- .../metadata/MetadataV2V3SerializerBase.java | 31 ++--- .../runtime/state/IncrementalKeyedStateHandle.java | 69 ++++++++- .../state/IncrementalLocalKeyedStateHandle.java | 16 +-- .../state/IncrementalRemoteKeyedStateHandle.java | 86 ++++++------ .../state/PlaceholderStreamStateHandle.java | 7 +- .../changelog/ChangelogStateBackendHandle.java | 31 +++-- .../checkpoint/CheckpointCoordinatorTest.java | 101 ++++++++------ .../checkpoint/metadata/CheckpointTestUtils.java | 14 +- .../runtime/scheduler/SchedulerUtilsTest.java | 34 +++-- .../IncrementalRemoteKeyedStateHandleTest.java | 155 +++++++++++++-------- .../runtime/state/SharedStateRegistryTest.java | 4 +- .../state/RocksDBKeyedStateBackendBuilder.java | 8 +- .../streaming/state/RocksDBStateDownloader.java | 24 ++-- .../streaming/state/RocksDBStateUploader.java | 64 ++++----- .../RocksDBIncrementalRestoreOperation.java | 4 +- .../state/restore/RocksDBRestoreResult.java | 11 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 38 +++-- .../snapshot/RocksIncrementalSnapshotStrategy.java | 120 ++++++++-------- .../snapshot/RocksNativeFullSnapshotStrategy.java | 44 +++--- .../state/snapshot/RocksSnapshotUtil.java | 13 -- .../state/EmbeddedRocksDBStateBackendTest.java | 24 ++-- .../state/RocksDBStateDownloaderTest.java | 19 ++- .../streaming/state/RocksDBStateUploaderTest.java | 36 ++--- .../RocksIncrementalSnapshotStrategyTest.java | 22 +-- .../test/checkpointing/StateHandleReuseITCase.java | 6 +- 25 files changed, 548 insertions(+), 433 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 37f22ee95ad..5f898359f96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; @@ -341,8 +342,8 @@ public abstract class MetadataV2V3SerializerBase { serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos); - serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos); + serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getSharedState(), dos); + serializeHandleAndLocalPathList(incrementalKeyedStateHandle.getPrivateState(), dos); writeStateHandleId(incrementalKeyedStateHandle, dos); } else if (stateHandle instanceof ChangelogStateBackendHandle) { @@ -548,10 +549,8 @@ public abstract class MetadataV2V3SerializerBase { KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1); StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis, context); - Map<StateHandleID, StreamStateHandle> sharedStates = - deserializeStreamStateHandleMap(dis, context); - Map<StateHandleID, StreamStateHandle> privateStates = - deserializeStreamStateHandleMap(dis, context); + List<HandleAndLocalPath> sharedStates = deserializeHandleAndLocalPathList(dis, context); + List<HandleAndLocalPath> privateStates = deserializeHandleAndLocalPathList(dis, context); UUID uuid; @@ -804,26 +803,26 @@ public abstract class MetadataV2V3SerializerBase { return new StateObjectCollection<>(result); } - private static void serializeStreamStateHandleMap( - Map<StateHandleID, StreamStateHandle> map, DataOutputStream dos) throws IOException { + private static void serializeHandleAndLocalPathList( + List<HandleAndLocalPath> list, DataOutputStream dos) throws IOException { - dos.writeInt(map.size()); - for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) { - dos.writeUTF(entry.getKey().toString()); - serializeStreamStateHandle(entry.getValue(), dos); + dos.writeInt(list.size()); + for (HandleAndLocalPath handleAndLocalPath : list) { + dos.writeUTF(handleAndLocalPath.getLocalPath()); + serializeStreamStateHandle(handleAndLocalPath.getHandle(), dos); } } - private static Map<StateHandleID, StreamStateHandle> deserializeStreamStateHandleMap( + private static List<HandleAndLocalPath> deserializeHandleAndLocalPathList( DataInputStream dis, @Nullable DeserializationContext context) throws IOException { final int size = dis.readInt(); - Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size); + List<HandleAndLocalPath> result = new ArrayList<>(size); for (int i = 0; i < size; ++i) { - StateHandleID stateHandleID = new StateHandleID(dis.readUTF()); + String localPath = dis.readUTF(); StreamStateHandle stateHandle = deserializeStreamStateHandle(dis, context); - result.put(stateHandleID, stateHandle); + result.add(HandleAndLocalPath.of(stateHandle, localPath)); } return result; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java index 76a6cd296dd..9ce3cbd332f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java @@ -20,9 +20,13 @@ package org.apache.flink.runtime.state; import javax.annotation.Nonnull; -import java.util.Map; +import java.io.Serializable; +import java.util.List; +import java.util.Objects; import java.util.UUID; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** Common interface to all incremental {@link KeyedStateHandle}. */ public interface IncrementalKeyedStateHandle extends KeyedStateHandle, CheckpointBoundKeyedStateHandle { @@ -32,9 +36,66 @@ public interface IncrementalKeyedStateHandle UUID getBackendIdentifier(); /** - * Returns a set of ids of all registered shared states in the backend at the time this was - * created. + * Returns a list of all shared states and the corresponding localPath in the backend at the + * time this was created. */ @Nonnull - Map<StateHandleID, StreamStateHandle> getSharedStateHandles(); + List<HandleAndLocalPath> getSharedStateHandles(); + + /** A Holder of StreamStateHandle and the corresponding localPath. */ + final class HandleAndLocalPath implements Serializable { + + private static final long serialVersionUID = 7711754687567545052L; + + StreamStateHandle handle; + final String localPath; + + public static HandleAndLocalPath of(StreamStateHandle handle, String localPath) { + checkNotNull(handle, "streamStateHandle cannot be null"); + checkNotNull(localPath, "localPath cannot be null"); + return new HandleAndLocalPath(handle, localPath); + } + + private HandleAndLocalPath(StreamStateHandle handle, String localPath) { + this.handle = handle; + this.localPath = localPath; + } + + public StreamStateHandle getHandle() { + return this.handle; + } + + public String getLocalPath() { + return this.localPath; + } + + public long getStateSize() { + return this.handle.getStateSize(); + } + + /** Replace the StreamStateHandle with the registry returned one. */ + public void replaceHandle(StreamStateHandle registryReturned) { + checkNotNull(registryReturned); + this.handle = registryReturned; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof HandleAndLocalPath)) { + return false; + } + + HandleAndLocalPath that = (HandleAndLocalPath) o; + return this.handle.equals(that.handle) && this.localPath.equals(that.localPath); + } + + @Override + public int hashCode() { + return Objects.hash(handle, localPath); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java index 85405dc649e..d782cf886bd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java @@ -23,8 +23,8 @@ import org.apache.flink.util.ExceptionUtils; import javax.annotation.Nonnegative; import javax.annotation.Nonnull; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; /** @@ -46,8 +46,8 @@ public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle /** Handle to Flink's state meta data. */ @Nonnull private final StreamStateHandle metaDataState; - /** Set with the ids of all shared state handles created by the checkpoint. */ - @Nonnull private final Map<StateHandleID, StreamStateHandle> sharedStateHandleIDs; + /** All shared state handles and the corresponding localPath used by the checkpoint. */ + @Nonnull private final List<HandleAndLocalPath> sharedState; public IncrementalLocalKeyedStateHandle( @Nonnull UUID backendIdentifier, @@ -55,13 +55,13 @@ public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle @Nonnull DirectoryStateHandle directoryStateHandle, @Nonnull KeyGroupRange keyGroupRange, @Nonnull StreamStateHandle metaDataState, - @Nonnull Map<StateHandleID, StreamStateHandle> sharedStateHandleIDs) { + @Nonnull List<HandleAndLocalPath> sharedState) { super(directoryStateHandle, keyGroupRange); this.backendIdentifier = backendIdentifier; this.checkpointId = checkpointId; this.metaDataState = metaDataState; - this.sharedStateHandleIDs = new HashMap<>(sharedStateHandleIDs); + this.sharedState = new ArrayList<>(sharedState); } @Nonnull @@ -93,8 +93,8 @@ public class IncrementalLocalKeyedStateHandle extends DirectoryKeyedStateHandle @Override @Nonnull - public Map<StateHandleID, StreamStateHandle> getSharedStateHandles() { - return sharedStateHandleIDs; + public List<HandleAndLocalPath> getSharedStateHandles() { + return sharedState; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java index 28813e7bd97..9318259fe62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java @@ -26,8 +26,10 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; -import java.util.Map; +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.UUID; +import java.util.stream.Collectors; /** * The handle to states of an incremental snapshot. @@ -77,10 +79,10 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH private final long checkpointId; /** Shared state in the incremental checkpoint. */ - private final Map<StateHandleID, StreamStateHandle> sharedState; + private final List<HandleAndLocalPath> sharedState; /** Private state in the incremental checkpoint. */ - private final Map<StateHandleID, StreamStateHandle> privateState; + private final List<HandleAndLocalPath> privateState; /** Primary meta data state of the incremental checkpoint. */ private final StreamStateHandle metaStateHandle; @@ -103,8 +105,8 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH UUID backendIdentifier, KeyGroupRange keyGroupRange, long checkpointId, - Map<StateHandleID, StreamStateHandle> sharedState, - Map<StateHandleID, StreamStateHandle> privateState, + List<HandleAndLocalPath> sharedState, + List<HandleAndLocalPath> privateState, StreamStateHandle metaStateHandle) { this( backendIdentifier, @@ -121,8 +123,8 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH UUID backendIdentifier, KeyGroupRange keyGroupRange, long checkpointId, - Map<StateHandleID, StreamStateHandle> sharedState, - Map<StateHandleID, StreamStateHandle> privateState, + List<HandleAndLocalPath> sharedState, + List<HandleAndLocalPath> privateState, StreamStateHandle metaStateHandle, long persistedSizeOfThisCheckpoint) { @@ -141,8 +143,8 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH UUID backendIdentifier, KeyGroupRange keyGroupRange, long checkpointId, - Map<StateHandleID, StreamStateHandle> sharedState, - Map<StateHandleID, StreamStateHandle> privateState, + List<HandleAndLocalPath> sharedState, + List<HandleAndLocalPath> privateState, StreamStateHandle metaStateHandle, long persistedSizeOfThisCheckpoint, StateHandleID stateHandleId) { @@ -164,8 +166,8 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH UUID backendIdentifier, KeyGroupRange keyGroupRange, long checkpointId, - Map<StateHandleID, StreamStateHandle> sharedState, - Map<StateHandleID, StreamStateHandle> privateState, + List<HandleAndLocalPath> sharedState, + List<HandleAndLocalPath> privateState, StreamStateHandle metaStateHandle, long persistedSizeOfThisCheckpoint, StateHandleID stateHandleId) { @@ -203,11 +205,11 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH stateHandleId); } - public Map<StateHandleID, StreamStateHandle> getSharedState() { + public List<HandleAndLocalPath> getSharedState() { return sharedState; } - public Map<StateHandleID, StreamStateHandle> getPrivateState() { + public List<HandleAndLocalPath> getPrivateState() { return privateState; } @@ -222,7 +224,7 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH @Nonnull @Override - public Map<StateHandleID, StreamStateHandle> getSharedStateHandles() { + public List<HandleAndLocalPath> getSharedStateHandles() { return getSharedState(); } @@ -262,7 +264,10 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH } try { - StateUtil.bestEffortDiscardAllStateObjects(privateState.values()); + StateUtil.bestEffortDiscardAllStateObjects( + privateState.stream() + .map(HandleAndLocalPath::getHandle) + .collect(Collectors.toList())); } catch (Exception e) { LOG.warn("Could not properly discard misc file states.", e); } @@ -270,7 +275,10 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH // discard only on TM; on JM, shared state is removed on subsumption if (!isRegistered) { try { - StateUtil.bestEffortDiscardAllStateObjects(sharedState.values()); + StateUtil.bestEffortDiscardAllStateObjects( + sharedState.stream() + .map(HandleAndLocalPath::getHandle) + .collect(Collectors.toList())); } catch (Exception e) { LOG.warn("Could not properly discard new sst file states.", e); } @@ -281,12 +289,12 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH public long getStateSize() { long size = StateUtil.getStateSize(metaStateHandle); - for (StreamStateHandle sharedStateHandle : sharedState.values()) { - size += sharedStateHandle.getStateSize(); + for (HandleAndLocalPath handleAndLocalPath : sharedState) { + size += handleAndLocalPath.getStateSize(); } - for (StreamStateHandle privateStateHandle : privateState.values()) { - size += privateStateHandle.getStateSize(); + for (HandleAndLocalPath handleAndLocalPath : privateState) { + size += handleAndLocalPath.getStateSize(); } return size; @@ -321,28 +329,18 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH checkpointId, backendIdentifier); - for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : - sharedState.entrySet()) { - SharedStateRegistryKey registryKey = - createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey()); - + for (HandleAndLocalPath handleAndLocalPath : sharedState) { StreamStateHandle reference = stateRegistry.registerReference( - registryKey, sharedStateHandle.getValue(), checkpointID); - - // This step consolidates our shared handles with the registry, which does two things: - // - // 1) Replace placeholder state handle with already registered, actual state handles. - // - // 2) Deduplicate re-uploads of incremental state due to missing confirmations about - // completed checkpoints. - // - // This prevents the following problem: - // A previous checkpoint n has already registered the state. This can happen if a - // following checkpoint (n + x) wants to reference the same state before the backend got - // notified that checkpoint n completed. In this case, the shared registry did - // deduplication and returns the previous reference. - sharedStateHandle.setValue(reference); + createSharedStateRegistryKey(handleAndLocalPath.getHandle()), + handleAndLocalPath.getHandle(), + checkpointID); + + // This step consolidates our shared handles with the registry, which will replace + // placeholder state handle with already registered, actual state handles. + // Because of SharedStateRegistryKey is based on the physical id of the stream handle, + // no de-duplication will be performed. see FLINK-29913. + handleAndLocalPath.replaceHandle(reference); } } @@ -359,11 +357,13 @@ public class IncrementalRemoteKeyedStateHandle implements IncrementalKeyedStateH stateHandleId); } - /** Create a unique key to register one of our shared state handles. */ + /** Create a unique key based on physical id to register one of our shared state handles. */ @VisibleForTesting - public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) { + public SharedStateRegistryKey createSharedStateRegistryKey(StreamStateHandle handle) { + String keyString = handle.getStreamStateHandleID().getKeyString(); + // key strings tend to be longer, so we use the MD5 of the key string to save memory return new SharedStateRegistryKey( - String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId); + UUID.nameUUIDFromBytes(keyString.getBytes(StandardCharsets.UTF_8)).toString()); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java index 4b42427de1c..a593ab0dbe2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java @@ -33,9 +33,11 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle { private static final long serialVersionUID = 1L; + private final PhysicalStateHandleID physicalID; private final long stateSize; - public PlaceholderStreamStateHandle(long stateSize) { + public PlaceholderStreamStateHandle(PhysicalStateHandleID physicalID, long stateSize) { + this.physicalID = physicalID; this.stateSize = stateSize; } @@ -53,8 +55,7 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle { @Override public PhysicalStateHandleID getStreamStateHandleID() { - throw new UnsupportedOperationException( - "This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator."); + return physicalID; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java index 6cf86ec65af..2d786c44d0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.changelog; import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.runtime.state.CheckpointBoundKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupsSavepointStateHandle; @@ -37,7 +38,6 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.stream.Collectors; @@ -175,18 +175,23 @@ public interface ChangelogStateBackendHandle StreamStateHandle castMetaStateHandle = restoreFileStateHandle( incrementalRemoteKeyedStateHandle.getMetaStateHandle()); - Map<StateHandleID, StreamStateHandle> castSharedStates = - incrementalRemoteKeyedStateHandle.getSharedState().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> restoreFileStateHandle(e.getValue()))); - Map<StateHandleID, StreamStateHandle> castPrivateStates = - incrementalRemoteKeyedStateHandle.getPrivateState().entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - e -> restoreFileStateHandle(e.getValue()))); + List<HandleAndLocalPath> castSharedStates = + incrementalRemoteKeyedStateHandle.getSharedState().stream() + .map( + e -> + HandleAndLocalPath.of( + restoreFileStateHandle(e.getHandle()), + e.getLocalPath())) + .collect(Collectors.toList()); + + List<HandleAndLocalPath> castPrivateStates = + incrementalRemoteKeyedStateHandle.getPrivateState().stream() + .map( + e -> + HandleAndLocalPath.of( + restoreFileStateHandle(e.getHandle()), + e.getLocalPath())) + .collect(Collectors.toList()); return IncrementalRemoteKeyedStateHandle.restore( incrementalRemoteKeyedStateHandle.getBackendIdentifier(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 6749b9aa516..6fa95fb5380 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.state.CheckpointMetadataOutputStream; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointStorageAccess; import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; @@ -58,7 +59,6 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestingStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileStateHandle; @@ -119,6 +119,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED; @@ -2907,11 +2908,11 @@ public class CheckpointCoordinatorTest extends TestLogger { int sharedHandleCount = 0; - List<Map<StateHandleID, StreamStateHandle>> sharedHandlesByCheckpoint = + List<List<HandleAndLocalPath>> sharedHandlesByCheckpoint = new ArrayList<>(numCheckpoints); for (int i = 0; i < numCheckpoints; ++i) { - sharedHandlesByCheckpoint.add(new HashMap<>(2)); + sharedHandlesByCheckpoint.add(new ArrayList<>(2)); } int cp = 0; @@ -2929,20 +2930,23 @@ public class CheckpointCoordinatorTest extends TestLogger { sharedHandlesByCheckpoint .get(cp) - .putAll(incrementalKeyedStateHandle.getSharedState()); - - for (StreamStateHandle streamStateHandle : - incrementalKeyedStateHandle.getSharedState().values()) { - assertTrue( - !(streamStateHandle - instanceof PlaceholderStreamStateHandle)); + .addAll(incrementalKeyedStateHandle.getSharedState()); + + for (HandleAndLocalPath handleAndLocalPath : + incrementalKeyedStateHandle.getSharedState()) { + StreamStateHandle streamStateHandle = + handleAndLocalPath.getHandle(); + assertThat( + streamStateHandle + instanceof PlaceholderStreamStateHandle) + .isFalse(); verify(streamStateHandle, never()).discardState(); ++sharedHandleCount; } - for (StreamStateHandle streamStateHandle : - incrementalKeyedStateHandle.getPrivateState().values()) { - verify(streamStateHandle, never()).discardState(); + for (HandleAndLocalPath handleAndLocalPath : + incrementalKeyedStateHandle.getPrivateState()) { + verify(handleAndLocalPath.getHandle(), never()).discardState(); } verify(incrementalKeyedStateHandle.getMetaStateHandle(), never()) @@ -2962,11 +2966,10 @@ public class CheckpointCoordinatorTest extends TestLogger { store.removeOldestCheckpoint(); // we expect no shared state was discarded because the state of CP0 is still referenced - // by - // CP1 - for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint) { - for (StreamStateHandle streamStateHandle : cpList.values()) { - verify(streamStateHandle, never()).discardState(); + // by CP1 + for (List<HandleAndLocalPath> cpList : sharedHandlesByCheckpoint) { + for (HandleAndLocalPath handleAndLocalPath : cpList) { + verify(handleAndLocalPath.getHandle(), never()).discardState(); } } @@ -3941,26 +3944,35 @@ public class CheckpointCoordinatorTest extends TestLogger { KeyGroupRange keyGroupRange = keyGroupPartitions1.get(index); - Map<StateHandleID, StreamStateHandle> privateState = new HashMap<>(); - privateState.put( - new StateHandleID("private-1"), - spy(new ByteStreamStateHandle("private-1", new byte[] {'p'}))); + List<HandleAndLocalPath> privateState = new ArrayList<>(); + privateState.add( + HandleAndLocalPath.of( + spy(new ByteStreamStateHandle("private-1", new byte[] {'p'})), + "private-1")); - Map<StateHandleID, StreamStateHandle> sharedState = new HashMap<>(); + List<HandleAndLocalPath> sharedState = new ArrayList<>(); // let all but the first CP overlap by one shared state. if (cpSequenceNumber > 0) { - sharedState.put( - new StateHandleID("shared-" + (cpSequenceNumber - 1)), - spy(new PlaceholderStreamStateHandle(1L))); + sharedState.add( + HandleAndLocalPath.of( + spy( + new ByteStreamStateHandle( + "shared-" + + (cpSequenceNumber - 1) + + "-" + + keyGroupRange, + new byte[] {'s'})), + "shared-" + (cpSequenceNumber - 1))); } - sharedState.put( - new StateHandleID("shared-" + cpSequenceNumber), - spy( - new ByteStreamStateHandle( - "shared-" + cpSequenceNumber + "-" + keyGroupRange, - new byte[] {'s'}))); + sharedState.add( + HandleAndLocalPath.of( + spy( + new ByteStreamStateHandle( + "shared-" + cpSequenceNumber + "-" + keyGroupRange, + new byte[] {'s'})), + "shared-" + cpSequenceNumber)); IncrementalRemoteKeyedStateHandle managedState = spy( @@ -4096,15 +4108,15 @@ public class CheckpointCoordinatorTest extends TestLogger { } private static void verifyDiscard( - List<Map<StateHandleID, StreamStateHandle>> sharedHandlesByCheckpoint1, + List<List<HandleAndLocalPath>> sharedHandles, Function<Integer, VerificationMode> checkpointVerify) throws Exception { - for (Map<StateHandleID, StreamStateHandle> cpList : sharedHandlesByCheckpoint1) { - for (Map.Entry<StateHandleID, StreamStateHandle> entry : cpList.entrySet()) { - String key = entry.getKey().getKeyString(); + for (List<HandleAndLocalPath> cpList : sharedHandles) { + for (HandleAndLocalPath handleAndLocalPath : cpList) { + String key = handleAndLocalPath.getLocalPath(); int checkpointID = Integer.parseInt(String.valueOf(key.charAt(key.length() - 1))); VerificationMode verificationMode = checkpointVerify.apply(checkpointID); - verify(entry.getValue(), verificationMode).discardState(); + verify(handleAndLocalPath.getHandle(), verificationMode).discardState(); } } } @@ -4139,10 +4151,13 @@ public class CheckpointCoordinatorTest extends TestLogger { TestingStreamStateHandle privateState, TestingStreamStateHandle sharedState) throws CheckpointException { - Map<StateHandleID, StreamStateHandle> sharedStateMap = - new HashMap<>(singletonMap(new StateHandleID("shared-state-key"), sharedState)); - Map<StateHandleID, StreamStateHandle> privateStateMap = - new HashMap<>(singletonMap(new StateHandleID("private-state-key"), privateState)); + List<HandleAndLocalPath> sharedStateList = + new ArrayList<>( + singletonList(HandleAndLocalPath.of(sharedState, "shared-state-key"))); + List<HandleAndLocalPath> privateStateList = + new ArrayList<>( + singletonList(HandleAndLocalPath.of(privateState, "private-state-key"))); + ExecutionJobVertex jobVertex = graph.getJobVertex(ackVertexID); OperatorID operatorID = jobVertex.getOperatorIDs().get(0).getGeneratedOperatorID(); coordinator.receiveAcknowledgeMessage( @@ -4160,8 +4175,8 @@ public class CheckpointCoordinatorTest extends TestLogger { UUID.randomUUID(), KeyGroupRange.of(0, 9), checkpointId, - sharedStateMap, - privateStateMap, + sharedStateList, + privateStateList, metaState)) .build()))), "test"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java index 0a0cb1c5934..a124a6f6dbb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeOffsets; @@ -34,7 +35,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -256,18 +256,18 @@ public class CheckpointTestUtils { createRandomUUID(rnd), new KeyGroupRange(1, 1), checkpointId, - createRandomStateHandleMap(rnd), - createRandomStateHandleMap(rnd), + createRandomHandleAndLocalPathList(rnd), + createRandomHandleAndLocalPathList(rnd), createDummyStreamStateHandle(rnd, null)); } - public static Map<StateHandleID, StreamStateHandle> createRandomStateHandleMap(Random rnd) { + public static List<HandleAndLocalPath> createRandomHandleAndLocalPathList(Random rnd) { final int size = rnd.nextInt(4); - Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size); + List<HandleAndLocalPath> result = new ArrayList<>(size); for (int i = 0; i < size; ++i) { - StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString()); + String localPath = createRandomUUID(rnd).toString(); StreamStateHandle stateHandle = createDummyStreamStateHandle(rnd, null); - result.put(randomId, stateHandle); + result.add(HandleAndLocalPath.of(stateHandle, localPath)); } return result; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java index 9e21dd42e08..70421734f65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java @@ -33,13 +33,13 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.RestoreMode; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryFactory; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; @@ -48,14 +48,12 @@ import org.apache.flink.util.concurrent.Executors; import org.junit.Test; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.Executor; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; @@ -94,11 +92,11 @@ public class SchedulerUtilsTest extends TestLogger { @Test public void testSharedStateRegistration() throws Exception { UUID backendId = UUID.randomUUID(); - StateHandleID key = new StateHandleID("k0"); + String localPath = "k0"; StreamStateHandle handle = new ByteStreamStateHandle("h0", new byte[] {1, 2, 3}); CheckpointRecoveryFactory recoveryFactory = buildRecoveryFactory( - buildCheckpoint(buildIncrementalHandle(key, handle, backendId))); + buildCheckpoint(buildIncrementalHandle(localPath, handle, backendId))); CompletedCheckpointStore checkpointStore = SchedulerUtils.createCompletedCheckpointStore( @@ -112,10 +110,20 @@ public class SchedulerUtilsTest extends TestLogger { SharedStateRegistry sharedStateRegistry = checkpointStore.getSharedStateRegistry(); IncrementalRemoteKeyedStateHandle newHandle = - buildIncrementalHandle(key, new PlaceholderStreamStateHandle(1L), backendId); + buildIncrementalHandle( + localPath, + new PlaceholderStreamStateHandle( + handle.getStreamStateHandleID(), handle.getStateSize()), + backendId); newHandle.registerSharedStates(sharedStateRegistry, 1L); - assertSame(handle, newHandle.getSharedState().get(key)); + assertSame( + handle, + newHandle.getSharedState().stream() + .filter(e -> e.getLocalPath().equals(localPath)) + .findFirst() + .get() + .getHandle()); } private CheckpointRecoveryFactory buildRecoveryFactory(CompletedCheckpoint checkpoint) { @@ -160,16 +168,16 @@ public class SchedulerUtilsTest extends TestLogger { } private IncrementalRemoteKeyedStateHandle buildIncrementalHandle( - StateHandleID key, StreamStateHandle shared, UUID backendIdentifier) { + String localPath, StreamStateHandle shared, UUID backendIdentifier) { StreamStateHandle meta = new ByteStreamStateHandle("meta", new byte[] {1, 2, 3}); - Map<StateHandleID, StreamStateHandle> sharedStateMap = new HashMap<>(); - sharedStateMap.put(key, shared); + List<HandleAndLocalPath> sharedState = new ArrayList<>(1); + sharedState.add(HandleAndLocalPath.of(shared, localPath)); return new IncrementalRemoteKeyedStateHandle( backendIdentifier, KeyGroupRange.EMPTY_KEY_GROUP_RANGE, 1, - sharedStateMap, - emptyMap(), + sharedState, + emptyList(), meta); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java index b1128786e1e..b73e59f7dca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java @@ -19,14 +19,18 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.checkpoint.metadata.CheckpointTestUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.junit.Assert; import org.junit.Test; -import java.util.Map; +import java.util.Collections; +import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -48,12 +52,12 @@ public class IncrementalRemoteKeyedStateHandleTest { stateHandle.discardState(); - for (StreamStateHandle handle : stateHandle.getPrivateState().values()) { - verify(handle).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle.getPrivateState()) { + verify(handleAndLocalPath.getHandle()).discardState(); } - for (StreamStateHandle handle : stateHandle.getSharedState().values()) { - verify(handle).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle.getSharedState()) { + verify(handleAndLocalPath.getHandle()).discardState(); } verify(stateHandle.getMetaStateHandle()).discardState(); @@ -73,13 +77,11 @@ public class IncrementalRemoteKeyedStateHandleTest { IncrementalRemoteKeyedStateHandle stateHandle2 = create(new Random(42)); // Both handles should not be registered and not discarded by now. - for (Map.Entry<StateHandleID, StreamStateHandle> entry : - stateHandle1.getSharedState().entrySet()) { - verify(entry.getValue(), times(0)).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle1.getSharedState()) { + verify(handleAndLocalPath.getHandle(), times(0)).discardState(); } - for (Map.Entry<StateHandleID, StreamStateHandle> entry : - stateHandle2.getSharedState().entrySet()) { - verify(entry.getValue(), times(0)).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle2.getSharedState()) { + verify(handleAndLocalPath.getHandle(), times(0)).discardState(); } // Now we register both ... @@ -87,48 +89,40 @@ public class IncrementalRemoteKeyedStateHandleTest { registry.checkpointCompleted(0L); stateHandle2.registerSharedStates(registry, 0L); - for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry : - stateHandle1.getSharedState().entrySet()) { + for (HandleAndLocalPath handleAndLocalPath : stateHandle1.getSharedState()) { + StreamStateHandle handle = handleAndLocalPath.getHandle(); - SharedStateRegistryKey registryKey = - stateHandle1.createSharedStateRegistryKeyFromFileName( - stateHandleEntry.getKey()); + SharedStateRegistryKey registryKey = stateHandle1.createSharedStateRegistryKey(handle); - verify(registry).registerReference(registryKey, stateHandleEntry.getValue(), 0L); + verify(registry).registerReference(registryKey, handle, 0L); } - for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry : - stateHandle2.getSharedState().entrySet()) { + for (HandleAndLocalPath handleAndLocalPath : stateHandle2.getSharedState()) { + StreamStateHandle handle = handleAndLocalPath.getHandle(); - SharedStateRegistryKey registryKey = - stateHandle1.createSharedStateRegistryKeyFromFileName( - stateHandleEntry.getKey()); + SharedStateRegistryKey registryKey = stateHandle2.createSharedStateRegistryKey(handle); - verify(registry).registerReference(registryKey, stateHandleEntry.getValue(), 0L); + verify(registry).registerReference(registryKey, handle, 0L); } // We discard the first stateHandle1.discardState(); // Should be unregistered, non-shared discarded, shared not discarded - for (Map.Entry<StateHandleID, StreamStateHandle> entry : - stateHandle1.getSharedState().entrySet()) { - verify(entry.getValue(), times(0)).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle1.getSharedState()) { + verify(handleAndLocalPath.getHandle(), times(0)).discardState(); } - for (StreamStateHandle handle : stateHandle2.getSharedState().values()) { - - verify(handle, times(0)).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle2.getSharedState()) { + verify(handleAndLocalPath.getHandle(), times(0)).discardState(); } - for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry : - stateHandle1.getPrivateState().entrySet()) { - verify(handleEntry.getValue(), times(1)).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle1.getPrivateState()) { + verify(handleAndLocalPath.getHandle(), times(1)).discardState(); } - for (Map.Entry<StateHandleID, StreamStateHandle> handleEntry : - stateHandle2.getPrivateState().entrySet()) { - verify(handleEntry.getValue(), times(0)).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle2.getPrivateState()) { + verify(handleAndLocalPath.getHandle(), times(0)).discardState(); } verify(stateHandle1.getMetaStateHandle(), times(1)).discardState(); @@ -139,14 +133,12 @@ public class IncrementalRemoteKeyedStateHandleTest { // Now everything should be unregistered and discarded registry.unregisterUnusedState(Long.MAX_VALUE); - for (Map.Entry<StateHandleID, StreamStateHandle> entry : - stateHandle1.getSharedState().entrySet()) { - verify(entry.getValue()).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle1.getSharedState()) { + verify(handleAndLocalPath.getHandle()).discardState(); } - for (Map.Entry<StateHandleID, StreamStateHandle> entry : - stateHandle2.getSharedState().entrySet()) { - verify(entry.getValue()).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandle2.getSharedState()) { + verify(handleAndLocalPath.getHandle()).discardState(); } verify(stateHandle1.getMetaStateHandle(), times(1)).discardState(); @@ -209,14 +201,14 @@ public class IncrementalRemoteKeyedStateHandleTest { // Should be completely discarded because it is tracked through the new registry sharedStateRegistryB.unregisterUnusedState(1L); - for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) { - verify(stateHandle, times(1)).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandleX.getSharedState()) { + verify(handleAndLocalPath.getHandle(), times(1)).discardState(); } - for (StreamStateHandle stateHandle : stateHandleY.getSharedState().values()) { - verify(stateHandle, never()).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandleY.getSharedState()) { + verify(handleAndLocalPath.getHandle(), never()).discardState(); } - for (StreamStateHandle stateHandle : stateHandleZ.getSharedState().values()) { - verify(stateHandle, never()).discardState(); + for (HandleAndLocalPath handleAndLocalPath : stateHandleZ.getSharedState()) { + verify(handleAndLocalPath.getHandle(), never()).discardState(); } sharedStateRegistryB.close(); } @@ -242,13 +234,63 @@ public class IncrementalRemoteKeyedStateHandleTest { assertEquals(handle.getStateHandleId(), newHandle.getStateHandleId()); } + @Test + public void testConcurrentCheckpointSharedStateRegistration() throws Exception { + String localPath = "1.sst"; + StreamStateHandle streamHandle1 = new ByteStreamStateHandle("file-1", new byte[] {'s'}); + StreamStateHandle streamHandle2 = new ByteStreamStateHandle("file-2", new byte[] {'s'}); + + SharedStateRegistry registry = new SharedStateRegistryImpl(); + + UUID backendID = UUID.randomUUID(); + + IncrementalRemoteKeyedStateHandle handle1 = + new IncrementalRemoteKeyedStateHandle( + backendID, + KeyGroupRange.of(0, 0), + 1L, + placeSpies( + Collections.singletonList( + HandleAndLocalPath.of(streamHandle1, localPath))), + Collections.emptyList(), + new ByteStreamStateHandle("", new byte[] {'s'})); + + handle1.registerSharedStates(registry, handle1.getCheckpointId()); + + IncrementalRemoteKeyedStateHandle handle2 = + new IncrementalRemoteKeyedStateHandle( + backendID, + KeyGroupRange.of(0, 0), + 2L, + placeSpies( + Collections.singletonList( + HandleAndLocalPath.of(streamHandle2, localPath))), + Collections.emptyList(), + new ByteStreamStateHandle("", new byte[] {'s'})); + + handle2.registerSharedStates(registry, handle2.getCheckpointId()); + + registry.checkpointCompleted(1L); + + // checkpoint 2 failed + handle2.discardState(); + + for (HandleAndLocalPath handleAndLocalPath : handle1.getSharedState()) { + verify(handleAndLocalPath.getHandle(), never()).discardState(); + } + for (HandleAndLocalPath handleAndLocalPath : handle2.getSharedState()) { + verify(handleAndLocalPath.getHandle(), never()).discardState(); + } + registry.close(); + } + private static IncrementalRemoteKeyedStateHandle create(Random rnd) { return new IncrementalRemoteKeyedStateHandle( UUID.nameUUIDFromBytes("test".getBytes()), KeyGroupRange.of(0, 0), 1L, - placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)), - placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)), + placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(rnd)), + placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(rnd)), spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd, null))); } @@ -257,18 +299,15 @@ public class IncrementalRemoteKeyedStateHandleTest { UUID.nameUUIDFromBytes("test".getBytes()), KeyGroupRange.of(0, 0), 1L, - placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)), - placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)), + placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(rnd)), + placeSpies(CheckpointTestUtils.createRandomHandleAndLocalPathList(rnd)), spy(CheckpointTestUtils.createDummyStreamStateHandle(rnd, null)), checkpointedSize); } - private static Map<StateHandleID, StreamStateHandle> placeSpies( - Map<StateHandleID, StreamStateHandle> map) { - - for (Map.Entry<StateHandleID, StreamStateHandle> entry : map.entrySet()) { - entry.setValue(spy(entry.getValue())); - } - return map; + private static List<HandleAndLocalPath> placeSpies(List<HandleAndLocalPath> list) { + return list.stream() + .map(e -> HandleAndLocalPath.of(spy(e.getHandle()), e.getLocalPath())) + .collect(Collectors.toList()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java index 625b3a8ea64..959d7b34243 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java @@ -290,8 +290,8 @@ public class SharedStateRegistryTest { UUID.randomUUID(), KeyGroupRange.EMPTY_KEY_GROUP_RANGE, 1L, - Collections.emptyMap(), - Collections.emptyMap(), + Collections.emptyList(), + Collections.emptyList(), new ByteStreamStateHandle("meta", new byte[1]), 1024L, new StateHandleID(stateId)); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 3df43d7224e..6b3eaee04ea 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -38,14 +38,13 @@ import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder; import org.apache.flink.runtime.state.BackendBuildingException; import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.PriorityQueueSetFactory; import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamCompressionDecorator; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory; import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper; import org.apache.flink.runtime.state.heap.InternalKeyContext; @@ -306,8 +305,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken try { // Variables for snapshot strategy when incremental checkpoint is enabled UUID backendUID = UUID.randomUUID(); - SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles = - new TreeMap<>(); + SortedMap<Long, Collection<HandleAndLocalPath>> materializedSstFiles = new TreeMap<>(); long lastCompletedCheckpointId = -1L; if (injectedTestDB != null) { db = injectedTestDB; @@ -525,7 +523,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken int keyGroupPrefixBytes, RocksDB db, UUID backendUID, - SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles, + SortedMap<Long, Collection<HandleAndLocalPath>> materializedSstFiles, long lastCompletedCheckpointId) { RocksDBSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy; RocksDBStateUploader stateUploader = diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java index d06790ced8a..6137ed4112e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java @@ -19,6 +19,7 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; @@ -33,7 +34,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -56,9 +56,8 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer { CloseableRegistry closeableRegistry) throws Exception { - final Map<StateHandleID, StreamStateHandle> sstFiles = restoreStateHandle.getSharedState(); - final Map<StateHandleID, StreamStateHandle> miscFiles = - restoreStateHandle.getPrivateState(); + final List<HandleAndLocalPath> sstFiles = restoreStateHandle.getSharedState(); + final List<HandleAndLocalPath> miscFiles = restoreStateHandle.getPrivateState(); downloadDataForAllStateHandles(sstFiles, dest, closeableRegistry); downloadDataForAllStateHandles(miscFiles, dest, closeableRegistry); @@ -69,14 +68,15 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer { * files w.r.t. their {@link StateHandleID}. */ private void downloadDataForAllStateHandles( - Map<StateHandleID, StreamStateHandle> stateHandleMap, + List<HandleAndLocalPath> handleWithPaths, Path restoreInstancePath, CloseableRegistry closeableRegistry) throws Exception { try { List<Runnable> runnables = - createDownloadRunnables(stateHandleMap, restoreInstancePath, closeableRegistry); + createDownloadRunnables( + handleWithPaths, restoreInstancePath, closeableRegistry); List<CompletableFuture<Void>> futures = new ArrayList<>(runnables.size()); for (Runnable runnable : runnables) { futures.add(CompletableFuture.runAsync(runnable, executorService)); @@ -94,15 +94,15 @@ public class RocksDBStateDownloader extends RocksDBStateDataTransfer { } private List<Runnable> createDownloadRunnables( - Map<StateHandleID, StreamStateHandle> stateHandleMap, + List<HandleAndLocalPath> handleWithPaths, Path restoreInstancePath, CloseableRegistry closeableRegistry) { - List<Runnable> runnables = new ArrayList<>(stateHandleMap.size()); - for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) { - StateHandleID stateHandleID = entry.getKey(); - StreamStateHandle remoteFileHandle = entry.getValue(); + List<Runnable> runnables = new ArrayList<>(handleWithPaths.size()); + for (HandleAndLocalPath handleAndLocalPath : handleWithPaths) { - Path path = restoreInstancePath.resolve(stateHandleID.toString()); + StreamStateHandle remoteFileHandle = handleAndLocalPath.getHandle(); + + Path path = restoreInstancePath.resolve(handleAndLocalPath.getLocalPath()); runnables.add( ThrowingRunnable.unchecked( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java index 02270e82d7d..983450fad75 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploader.java @@ -22,7 +22,7 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; -import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.ExceptionUtils; @@ -37,11 +37,11 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; -import java.util.HashMap; -import java.util.Map; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.function.Supplier; +import java.util.stream.Collectors; /** Help class for uploading RocksDB state files. */ public class RocksDBStateUploader extends RocksDBStateDataTransfer { @@ -59,17 +59,15 @@ public class RocksDBStateUploader extends RocksDBStateDataTransfer { * @param stateScope * @throws Exception Thrown if can not upload all the files. */ - public Map<StateHandleID, StreamStateHandle> uploadFilesToCheckpointFs( - @Nonnull Map<StateHandleID, Path> files, + public List<HandleAndLocalPath> uploadFilesToCheckpointFs( + @Nonnull List<Path> files, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) throws Exception { - Map<StateHandleID, StreamStateHandle> handles = new HashMap<>(); - - Map<StateHandleID, CompletableFuture<StreamStateHandle>> futures = + List<CompletableFuture<HandleAndLocalPath>> futures = createUploadFutures( files, checkpointStreamFactory, @@ -77,12 +75,13 @@ public class RocksDBStateUploader extends RocksDBStateDataTransfer { closeableRegistry, tmpResourcesRegistry); + List<HandleAndLocalPath> handles = new ArrayList<>(files.size()); + try { - FutureUtils.waitForAll(futures.values()).get(); + FutureUtils.waitForAll(futures).get(); - for (Map.Entry<StateHandleID, CompletableFuture<StreamStateHandle>> entry : - futures.entrySet()) { - handles.put(entry.getKey(), entry.getValue().get()); + for (CompletableFuture<HandleAndLocalPath> future : futures) { + handles.add(future.get()); } } catch (ExecutionException e) { Throwable throwable = ExceptionUtils.stripExecutionException(e); @@ -97,32 +96,29 @@ public class RocksDBStateUploader extends RocksDBStateDataTransfer { return handles; } - private Map<StateHandleID, CompletableFuture<StreamStateHandle>> createUploadFutures( - Map<StateHandleID, Path> files, + private List<CompletableFuture<HandleAndLocalPath>> createUploadFutures( + List<Path> files, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, CloseableRegistry closeableRegistry, CloseableRegistry tmpResourcesRegistry) { - Map<StateHandleID, CompletableFuture<StreamStateHandle>> futures = - new HashMap<>(files.size()); - - for (Map.Entry<StateHandleID, Path> entry : files.entrySet()) { - final Supplier<StreamStateHandle> supplier = - CheckedSupplier.unchecked( - () -> - uploadLocalFileToCheckpointFs( - entry.getValue(), - checkpointStreamFactory, - stateScope, - closeableRegistry, - tmpResourcesRegistry)); - futures.put(entry.getKey(), CompletableFuture.supplyAsync(supplier, executorService)); - } - - return futures; + return files.stream() + .map( + e -> + CompletableFuture.supplyAsync( + CheckedSupplier.unchecked( + () -> + uploadLocalFileToCheckpointFs( + e, + checkpointStreamFactory, + stateScope, + closeableRegistry, + tmpResourcesRegistry)), + executorService)) + .collect(Collectors.toList()); } - private StreamStateHandle uploadLocalFileToCheckpointFs( + private HandleAndLocalPath uploadLocalFileToCheckpointFs( Path filePath, CheckpointStreamFactory checkpointStreamFactory, CheckpointedStateScope stateScope, @@ -161,7 +157,7 @@ public class RocksDBStateUploader extends RocksDBStateDataTransfer { } tmpResourcesRegistry.registerCloseable( () -> StateUtil.discardStateObjectQuietly(result)); - return result; + return HandleAndLocalPath.of(result, filePath.getFileName().toString()); } finally { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java index d6ec9ae6055..5a89403617e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java @@ -36,13 +36,13 @@ import org.apache.flink.runtime.state.BackendBuildingException; import org.apache.flink.runtime.state.CompositeKeySerializationUtils; import org.apache.flink.runtime.state.DirectoryStateHandle; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateSerializerProvider; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; @@ -86,7 +86,7 @@ public class RocksDBIncrementalRestoreOperation<K> implements RocksDBRestoreOper LoggerFactory.getLogger(RocksDBIncrementalRestoreOperation.class); private final String operatorIdentifier; - private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> restoredSstFiles; + private final SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles; private final RocksDBHandle rocksHandle; private final Collection<KeyedStateHandle> restoreStateHandles; private final CloseableRegistry cancelStreamRegistry; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java index 87f124fe6d0..ad17b6f2769 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBRestoreResult.java @@ -19,13 +19,12 @@ package org.apache.flink.contrib.streaming.state.restore; import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor; -import org.apache.flink.runtime.state.StateHandleID; -import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDB; -import java.util.Map; +import java.util.Collection; import java.util.SortedMap; import java.util.UUID; @@ -38,7 +37,7 @@ public class RocksDBRestoreResult { // fields only for incremental restore private final long lastCompletedCheckpointId; private final UUID backendUID; - private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> restoredSstFiles; + private final SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles; public RocksDBRestoreResult( RocksDB db, @@ -46,7 +45,7 @@ public class RocksDBRestoreResult { RocksDBNativeMetricMonitor nativeMetricMonitor, long lastCompletedCheckpointId, UUID backendUID, - SortedMap<Long, Map<StateHandleID, StreamStateHandle>> restoredSstFiles) { + SortedMap<Long, Collection<HandleAndLocalPath>> restoredSstFiles) { this.db = db; this.defaultColumnFamilyHandle = defaultColumnFamilyHandle; this.nativeMetricMonitor = nativeMetricMonitor; @@ -67,7 +66,7 @@ public class RocksDBRestoreResult { return backendUID; } - public SortedMap<Long, Map<StateHandleID, StreamStateHandle>> getRestoredSstFiles() { + public SortedMap<Long, Collection<HandleAndLocalPath>> getRestoredSstFiles() { return restoredSstFiles; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java index 6db43289cad..9cb73d01dc9 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.DirectoryStateHandle; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; @@ -39,7 +40,6 @@ import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.SnapshotResources; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.SnapshotStrategy; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; @@ -61,12 +61,14 @@ import java.io.File; import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; /** * Abstract base class for {@link SnapshotStrategy} implementations for RocksDB state backend. @@ -334,7 +336,7 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources protected Optional<KeyedStateHandle> getLocalSnapshot( @Nullable StreamStateHandle localStreamStateHandle, - Map<StateHandleID, StreamStateHandle> sharedStateHandleIDs) + List<HandleAndLocalPath> sharedState) throws IOException { final DirectoryStateHandle directoryStateHandle = localBackupDirectory.completeSnapshotAndGetHandle(); @@ -346,7 +348,7 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources directoryStateHandle, keyGroupRange, localStreamStateHandle, - sharedStateHandleIDs)); + sharedState)); } else { return Optional.empty(); } @@ -390,23 +392,33 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources } protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT = - new PreviousSnapshot(Collections.emptyMap()); + new PreviousSnapshot(Collections.emptyList()); /** Previous snapshot with uploaded sst files. */ protected static class PreviousSnapshot { - @Nullable private final Map<StateHandleID, Long> confirmedSstFiles; - - protected PreviousSnapshot(@Nullable Map<StateHandleID, Long> confirmedSstFiles) { - this.confirmedSstFiles = confirmedSstFiles; + @Nonnull private final Map<String, StreamStateHandle> confirmedSstFiles; + + protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> confirmedSstFiles) { + this.confirmedSstFiles = + confirmedSstFiles != null + ? confirmedSstFiles.stream() + .collect( + Collectors.toMap( + HandleAndLocalPath::getLocalPath, + HandleAndLocalPath::getHandle)) + : Collections.emptyMap(); } - protected Optional<StreamStateHandle> getUploaded(StateHandleID stateHandleID) { - if (confirmedSstFiles != null && confirmedSstFiles.containsKey(stateHandleID)) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) + protected Optional<StreamStateHandle> getUploaded(String filename) { + if (confirmedSstFiles.containsKey(filename)) { + StreamStateHandle handle = confirmedSstFiles.get(filename); + // We introduce a placeholder state handle to reduce network transfer overhead, + // it will be replaced by the original handle from the shared state registry + // (created from a previous checkpoint). return Optional.of( - new PlaceholderStreamStateHandle(confirmedSstFiles.get(stateHandleID))); + new PlaceholderStreamStateHandle( + handle.getStreamStateHandleID(), handle.getStateSize())); } else { // Don't use any uploaded but not confirmed handles because they might be deleted // (by TM) if the previous checkpoint failed. See FLINK-25395 diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index f93f41fbaa0..9eb66a5ee8c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -27,14 +27,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.SnapshotType; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; -import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.SnapshotResult; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; @@ -49,7 +48,9 @@ import javax.annotation.Nonnull; import java.io.File; import java.nio.file.Path; -import java.util.HashMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -57,10 +58,8 @@ import java.util.Optional; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; -import java.util.stream.Collectors; import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX; -import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.getUploadedStateSize; /** * Snapshot strategy for {@link org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend} @@ -78,11 +77,11 @@ public class RocksIncrementalSnapshotStrategy<K> private static final String DESCRIPTION = "Asynchronous incremental RocksDB snapshot"; /** - * Stores the {@link StateHandleID IDs} of uploaded SST files that build the incremental - * history. Once the checkpoint is confirmed by JM, only the ID paired with {@link - * PlaceholderStreamStateHandle} can be sent. + * Stores the {@link StreamStateHandle} and corresponding local path of uploaded SST files that + * build the incremental history. Once the checkpoint is confirmed by JM, they can be reused for + * incremental checkpoint. */ - @Nonnull private final SortedMap<Long, Map<StateHandleID, Long>> uploadedStateIDs; + @Nonnull private final SortedMap<Long, Collection<HandleAndLocalPath>> uploadedSstFiles; /** The identifier of the last completed checkpoint. */ private long lastCompletedCheckpointId; @@ -101,7 +100,7 @@ public class RocksIncrementalSnapshotStrategy<K> @Nonnull CloseableRegistry cancelStreamRegistry, @Nonnull File instanceBasePath, @Nonnull UUID backendUID, - @Nonnull SortedMap<Long, Map<StateHandleID, StreamStateHandle>> uploadedStateHandles, + @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> uploadedStateHandles, @Nonnull RocksDBStateUploader rocksDBStateUploader, long lastCompletedCheckpointId) { @@ -116,16 +115,8 @@ public class RocksIncrementalSnapshotStrategy<K> localRecoveryConfig, instanceBasePath, backendUID); - this.uploadedStateIDs = new TreeMap<>(); - for (Map.Entry<Long, Map<StateHandleID, StreamStateHandle>> entry : - uploadedStateHandles.entrySet()) { - Map<StateHandleID, Long> map = new HashMap<>(); - for (Map.Entry<StateHandleID, StreamStateHandle> stateHandleEntry : - entry.getValue().entrySet()) { - map.put(stateHandleEntry.getKey(), stateHandleEntry.getValue().getStateSize()); - } - this.uploadedStateIDs.put(entry.getKey(), map); - } + + this.uploadedSstFiles = new TreeMap<>(uploadedStateHandles); this.stateUploader = rocksDBStateUploader; this.lastCompletedCheckpointId = lastCompletedCheckpointId; } @@ -174,13 +165,13 @@ public class RocksIncrementalSnapshotStrategy<K> @Override public void notifyCheckpointComplete(long completedCheckpointId) { - synchronized (uploadedStateIDs) { + synchronized (uploadedSstFiles) { // FLINK-23949: materializedSstFiles.keySet().contains(completedCheckpointId) make sure // the notified checkpointId is not a savepoint, otherwise next checkpoint will // degenerate into a full checkpoint if (completedCheckpointId > lastCompletedCheckpointId - && uploadedStateIDs.containsKey(completedCheckpointId)) { - uploadedStateIDs + && uploadedSstFiles.containsKey(completedCheckpointId)) { + uploadedSstFiles .keySet() .removeIf(checkpointId -> checkpointId < completedCheckpointId); lastCompletedCheckpointId = completedCheckpointId; @@ -190,8 +181,8 @@ public class RocksIncrementalSnapshotStrategy<K> @Override public void notifyCheckpointAborted(long abortedCheckpointId) { - synchronized (uploadedStateIDs) { - uploadedStateIDs.keySet().remove(abortedCheckpointId); + synchronized (uploadedSstFiles) { + uploadedSstFiles.keySet().remove(abortedCheckpointId); } } @@ -205,13 +196,12 @@ public class RocksIncrementalSnapshotStrategy<K> long checkpointId, @Nonnull List<StateMetaInfoSnapshot> stateMetaInfoSnapshots) { final long lastCompletedCheckpoint; - final Map<StateHandleID, Long> confirmedSstFiles; - final Map<StateHandleID, StreamStateHandle> uploadedSstFiles; + final Collection<HandleAndLocalPath> confirmedSstFiles; // use the last completed checkpoint as the comparison base. - synchronized (uploadedStateIDs) { + synchronized (uploadedSstFiles) { lastCompletedCheckpoint = lastCompletedCheckpointId; - confirmedSstFiles = uploadedStateIDs.get(lastCompletedCheckpoint); + confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint); LOG.trace( "Use confirmed SST files for checkpoint {}: {}", checkpointId, @@ -268,9 +258,9 @@ public class RocksIncrementalSnapshotStrategy<K> // Handle to the meta data file SnapshotResult<StreamStateHandle> metaStateHandle = null; // Handles to new sst files since the last completed checkpoint will go here - final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>(); + final List<HandleAndLocalPath> sstFiles = new ArrayList<>(); // Handles to the misc files in the current snapshot will go here - final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>(); + final List<HandleAndLocalPath> miscFiles = new ArrayList<>(); try { @@ -288,11 +278,13 @@ public class RocksIncrementalSnapshotStrategy<K> metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created."); - uploadSstFiles( - sstFiles, miscFiles, snapshotCloseableRegistry, tmpResourcesRegistry); long checkpointedSize = metaStateHandle.getStateSize(); - checkpointedSize += getUploadedStateSize(sstFiles.values()); - checkpointedSize += getUploadedStateSize(miscFiles.values()); + checkpointedSize += + uploadSnapshotFiles( + sstFiles, + miscFiles, + snapshotCloseableRegistry, + tmpResourcesRegistry); // We make the 'sstFiles' as the 'sharedState' in IncrementalRemoteKeyedStateHandle, // whether they belong to the sharded CheckpointedStateScope or exclusive @@ -334,9 +326,10 @@ public class RocksIncrementalSnapshotStrategy<K> } } - private void uploadSstFiles( - @Nonnull Map<StateHandleID, StreamStateHandle> sstFiles, - @Nonnull Map<StateHandleID, StreamStateHandle> miscFiles, + /** upload files and return total uploaded size. */ + private long uploadSnapshotFiles( + @Nonnull List<HandleAndLocalPath> sstFiles, + @Nonnull List<HandleAndLocalPath> miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception { @@ -344,43 +337,47 @@ public class RocksIncrementalSnapshotStrategy<K> // write state data Preconditions.checkState(localBackupDirectory.exists()); - Map<StateHandleID, Path> sstFilePaths = new HashMap<>(); - Map<StateHandleID, Path> miscFilePaths = new HashMap<>(); - Path[] files = localBackupDirectory.listDirectory(); + long uploadedSize = 0; if (files != null) { + List<Path> sstFilePaths = new ArrayList<>(files.length); + List<Path> miscFilePaths = new ArrayList<>(files.length); + createUploadFilePaths(files, sstFiles, sstFilePaths, miscFilePaths); final CheckpointedStateScope stateScope = sharingFilesStrategy == SnapshotType.SharingFilesStrategy.NO_SHARING ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED; - sstFiles.putAll( + + List<HandleAndLocalPath> sstFilesUploadResult = stateUploader.uploadFilesToCheckpointFs( sstFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, - tmpResourcesRegistry)); - miscFiles.putAll( + tmpResourcesRegistry); + uploadedSize += + sstFilesUploadResult.stream().mapToLong(e -> e.getStateSize()).sum(); + sstFiles.addAll(sstFilesUploadResult); + + List<HandleAndLocalPath> miscFilesUploadResult = stateUploader.uploadFilesToCheckpointFs( miscFilePaths, checkpointStreamFactory, stateScope, snapshotCloseableRegistry, - tmpResourcesRegistry)); + tmpResourcesRegistry); + uploadedSize += + miscFilesUploadResult.stream().mapToLong(e -> e.getStateSize()).sum(); + miscFiles.addAll(miscFilesUploadResult); - synchronized (uploadedStateIDs) { + synchronized (uploadedSstFiles) { switch (sharingFilesStrategy) { case FORWARD_BACKWARD: case FORWARD: - uploadedStateIDs.put( - checkpointId, - sstFiles.entrySet().stream() - .collect( - Collectors.toMap( - Map.Entry::getKey, - t -> t.getValue().getStateSize()))); + uploadedSstFiles.put( + checkpointId, Collections.unmodifiableList(sstFiles)); break; case NO_SHARING: break; @@ -392,27 +389,26 @@ public class RocksIncrementalSnapshotStrategy<K> } } } + return uploadedSize; } private void createUploadFilePaths( Path[] files, - Map<StateHandleID, StreamStateHandle> sstFiles, - Map<StateHandleID, Path> sstFilePaths, - Map<StateHandleID, Path> miscFilePaths) { + List<HandleAndLocalPath> sstFiles, + List<Path> sstFilePaths, + List<Path> miscFilePaths) { for (Path filePath : files) { final String fileName = filePath.getFileName().toString(); - final StateHandleID stateHandleID = new StateHandleID(fileName); if (fileName.endsWith(SST_FILE_SUFFIX)) { - Optional<StreamStateHandle> uploaded = - previousSnapshot.getUploaded(stateHandleID); + Optional<StreamStateHandle> uploaded = previousSnapshot.getUploaded(fileName); if (uploaded.isPresent()) { - sstFiles.put(stateHandleID, uploaded.get()); + sstFiles.add(HandleAndLocalPath.of(uploaded.get(), fileName)); } else { - sstFilePaths.put(stateHandleID, filePath); // re-upload + sstFilePaths.add(filePath); // re-upload } } else { - miscFilePaths.put(stateHandleID, filePath); + miscFilePaths.add(filePath); } } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java index a6d4f843d7d..3fc45862dc8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksNativeFullSnapshotStrategy.java @@ -26,13 +26,13 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.LocalRecoveryConfig; import org.apache.flink.runtime.state.SnapshotDirectory; import org.apache.flink.runtime.state.SnapshotResult; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.Preconditions; @@ -45,16 +45,15 @@ import javax.annotation.Nonnull; import java.io.File; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.UUID; -import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.getUploadedStateSize; - /** * Snapshot strategy for {@link RocksDBKeyedStateBackend} based on RocksDB's native checkpoints and * creates full snapshots. the difference between savepoint is that sst files will be uploaded @@ -165,7 +164,7 @@ public class RocksNativeFullSnapshotStrategy<K> // Handle to the meta data file SnapshotResult<StreamStateHandle> metaStateHandle = null; // Handles to all the files in the current snapshot will go here - final Map<StateHandleID, StreamStateHandle> privateFiles = new HashMap<>(); + final List<HandleAndLocalPath> privateFiles = new ArrayList<>(); try { @@ -183,23 +182,25 @@ public class RocksNativeFullSnapshotStrategy<K> metaStateHandle.getJobManagerOwnedSnapshot(), "Metadata for job manager was not properly created."); - uploadSstFiles(privateFiles, snapshotCloseableRegistry, tmpResourcesRegistry); long checkpointedSize = metaStateHandle.getStateSize(); - checkpointedSize += getUploadedStateSize(privateFiles.values()); + + checkpointedSize += + uploadSnapshotFiles( + privateFiles, snapshotCloseableRegistry, tmpResourcesRegistry); final IncrementalRemoteKeyedStateHandle jmIncrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle( backendUID, keyGroupRange, checkpointId, - Collections.emptyMap(), + Collections.emptyList(), privateFiles, metaStateHandle.getJobManagerOwnedSnapshot(), checkpointedSize); Optional<KeyedStateHandle> localSnapshot = getLocalSnapshot( - metaStateHandle.getTaskLocalSnapshot(), Collections.emptyMap()); + metaStateHandle.getTaskLocalSnapshot(), Collections.emptyList()); final SnapshotResult<KeyedStateHandle> snapshotResult = localSnapshot .map( @@ -219,8 +220,9 @@ public class RocksNativeFullSnapshotStrategy<K> } } - private void uploadSstFiles( - @Nonnull Map<StateHandleID, StreamStateHandle> privateFiles, + /** upload files and return total uploaded size. */ + private long uploadSnapshotFiles( + @Nonnull List<HandleAndLocalPath> privateFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, @Nonnull CloseableRegistry tmpResourcesRegistry) throws Exception { @@ -228,25 +230,23 @@ public class RocksNativeFullSnapshotStrategy<K> // write state data Preconditions.checkState(localBackupDirectory.exists()); - Map<StateHandleID, Path> privateFilePaths = new HashMap<>(); - Path[] files = localBackupDirectory.listDirectory(); + long uploadedSize = 0; if (files != null) { // all sst files are private in full snapshot - for (Path filePath : files) { - final String fileName = filePath.getFileName().toString(); - final StateHandleID stateHandleID = new StateHandleID(fileName); - privateFilePaths.put(stateHandleID, filePath); - } - - privateFiles.putAll( + List<HandleAndLocalPath> uploadedFiles = stateUploader.uploadFilesToCheckpointFs( - privateFilePaths, + Arrays.asList(files), checkpointStreamFactory, CheckpointedStateScope.EXCLUSIVE, snapshotCloseableRegistry, - tmpResourcesRegistry)); + tmpResourcesRegistry); + + uploadedSize += uploadedFiles.stream().mapToLong(e -> e.getStateSize()).sum(); + + privateFiles.addAll(uploadedFiles); } + return uploadedSize; } } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java index 10425708b4f..cf0692c6777 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksSnapshotUtil.java @@ -18,12 +18,6 @@ package org.apache.flink.contrib.streaming.state.snapshot; -import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; -import org.apache.flink.runtime.state.StateObject; -import org.apache.flink.runtime.state.StreamStateHandle; - -import java.util.Collection; - /** * Utility methods and constants around RocksDB creating and restoring snapshots for {@link * org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend}. @@ -36,11 +30,4 @@ public class RocksSnapshotUtil { private RocksSnapshotUtil() { throw new AssertionError(); } - - public static long getUploadedStateSize(Collection<StreamStateHandle> streamStateHandles) { - return streamStateHandles.stream() - .filter(s -> !(s instanceof PlaceholderStreamStateHandle)) - .mapToLong(StateObject::getStateSize) - .sum(); - } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java index 4d6b533d662..642ff87e248 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java @@ -33,14 +33,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend; import org.apache.flink.runtime.state.ConfigurableStateBackend; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryImpl; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackendTestBase; -import org.apache.flink.runtime.state.StateHandleID; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage; @@ -78,12 +77,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Queue; import java.util.concurrent.RunnableFuture; +import java.util.stream.Collectors; import static junit.framework.TestCase.assertNotNull; import static org.junit.Assert.assertEquals; @@ -582,17 +580,23 @@ public class EmbeddedRocksDBStateBackendTest (IncrementalRemoteKeyedStateHandle) snapshotResult.getJobManagerOwnedSnapshot(); - Map<StateHandleID, StreamStateHandle> sharedState = - new HashMap<>(stateHandle.getSharedState()); + // create new HandleAndLocalPath object for keeping handle before replacement + List<HandleAndLocalPath> sharedState = + stateHandle.getSharedState().stream() + .map( + e -> + HandleAndLocalPath.of( + e.getHandle(), e.getLocalPath())) + .collect(Collectors.toList()); stateHandle.registerSharedStates(sharedStateRegistry, checkpointId); - for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) { + for (HandleAndLocalPath handleAndLocalPath : sharedState) { verify(sharedStateRegistry) .registerReference( - stateHandle.createSharedStateRegistryKeyFromFileName( - e.getKey()), - e.getValue(), + stateHandle.createSharedStateRegistryKey( + handleAndLocalPath.getHandle()), + handleAndLocalPath.getHandle(), checkpointId); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java index fcce8674887..e19ee84d63f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java @@ -20,9 +20,9 @@ package org.apache.flink.contrib.streaming.state; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestStreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -37,9 +37,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.Random; import java.util.UUID; @@ -59,8 +57,8 @@ public class RocksDBStateDownloaderTest extends TestLogger { new SpecifiedException("throw exception while multi thread restore."); StreamStateHandle stateHandle = new ThrowingStateHandle(expectedException); - Map<StateHandleID, StreamStateHandle> stateHandles = new HashMap<>(1); - stateHandles.put(new StateHandleID("state1"), stateHandle); + List<HandleAndLocalPath> stateHandles = new ArrayList<>(1); + stateHandles.add(HandleAndLocalPath.of(stateHandle, "state1")); IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = new IncrementalRemoteKeyedStateHandle( @@ -98,12 +96,13 @@ public class RocksDBStateDownloaderTest extends TestLogger { handles.add(new ByteStreamStateHandle(String.format("state%d", i), contents[i])); } - Map<StateHandleID, StreamStateHandle> sharedStates = new HashMap<>(contentNum); - Map<StateHandleID, StreamStateHandle> privateStates = new HashMap<>(contentNum); + List<HandleAndLocalPath> sharedStates = new ArrayList<>(contentNum); + List<HandleAndLocalPath> privateStates = new ArrayList<>(contentNum); for (int i = 0; i < contentNum; ++i) { - sharedStates.put(new StateHandleID(String.format("sharedState%d", i)), handles.get(i)); - privateStates.put( - new StateHandleID(String.format("privateState%d", i)), handles.get(i)); + sharedStates.add( + HandleAndLocalPath.of(handles.get(i), String.format("sharedState%d", i))); + privateStates.add( + HandleAndLocalPath.of(handles.get(i), String.format("privateState%d", i))); } IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java index 7af62c83705..60e0fee9d79 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateUploaderTest.java @@ -24,7 +24,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; -import org.apache.flink.runtime.state.StateHandleID; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; @@ -42,10 +42,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; @@ -90,8 +89,8 @@ public class RocksDBStateUploaderTest extends TestLogger { File file = TempDirUtils.newFile(temporaryFolder, String.valueOf(UUID.randomUUID())); generateRandomFileContent(file.getPath(), 20); - Map<StateHandleID, Path> filePaths = new HashMap<>(1); - filePaths.put(new StateHandleID("mockHandleID"), file.toPath()); + List<Path> filePaths = new ArrayList<>(1); + filePaths.add(file.toPath()); try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) { assertThatThrownBy( () -> @@ -134,7 +133,7 @@ public class RocksDBStateUploaderTest extends TestLogger { String localFolder = "local"; TempDirUtils.newFolder(temporaryFolder, localFolder); - Map<StateHandleID, Path> filePaths = + List<Path> filePaths = generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold); CloseableRegistry tmpResourcesRegistry = new CloseableRegistry(); try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(1)) { @@ -165,12 +164,11 @@ public class RocksDBStateUploaderTest extends TestLogger { assertThat(checkpointPrivateFolder.list()).isEmpty(); assertThat(checkpointSharedFolder.list()).isEmpty(); - Map.Entry<StateHandleID, Path> first = filePaths.entrySet().stream().findFirst().get(); + Path first = filePaths.stream().findFirst().get(); assertThatThrownBy( () -> rocksDBStateUploader.uploadFilesToCheckpointFs( - Collections.singletonMap( - first.getKey(), first.getValue()), + Collections.singletonList(first), checkpointStreamFactory, CheckpointedStateScope.SHARED, new CloseableRegistry(), @@ -210,11 +208,11 @@ public class RocksDBStateUploaderTest extends TestLogger { TempDirUtils.newFolder(temporaryFolder, localFolder); int sstFileCount = 6; - Map<StateHandleID, Path> sstFilePaths = + List<Path> sstFilePaths = generateRandomSstFiles(localFolder, sstFileCount, fileStateSizeThreshold); try (RocksDBStateUploader rocksDBStateUploader = new RocksDBStateUploader(5)) { - Map<StateHandleID, StreamStateHandle> sstFiles = + List<HandleAndLocalPath> sstFiles = rocksDBStateUploader.uploadFilesToCheckpointFs( sstFilePaths, checkpointStreamFactory, @@ -222,9 +220,15 @@ public class RocksDBStateUploaderTest extends TestLogger { new CloseableRegistry(), new CloseableRegistry()); - for (Map.Entry<StateHandleID, Path> entry : sstFilePaths.entrySet()) { + for (Path path : sstFilePaths) { assertStateContentEqual( - entry.getValue(), sstFiles.get(entry.getKey()).openInputStream()); + path, + sstFiles.stream() + .filter(e -> e.getLocalPath().equals(path.getFileName().toString())) + .findFirst() + .get() + .getHandle() + .openInputStream()); } } } @@ -259,18 +263,18 @@ public class RocksDBStateUploaderTest extends TestLogger { }; } - private Map<StateHandleID, Path> generateRandomSstFiles( + private List<Path> generateRandomSstFiles( String localFolder, int sstFileCount, int fileStateSizeThreshold) throws IOException { ThreadLocalRandom random = ThreadLocalRandom.current(); - Map<StateHandleID, Path> sstFilePaths = new HashMap<>(sstFileCount); + List<Path> sstFilePaths = new ArrayList<>(sstFileCount); for (int i = 0; i < sstFileCount; ++i) { File file = TempDirUtils.newFile( temporaryFolder, String.format("%s/%d.sst", localFolder, i)); generateRandomFileContent( file.getPath(), random.nextInt(1_000_000) + fileStateSizeThreshold); - sstFilePaths.put(new StateHandleID(String.valueOf(i)), file.toPath()); + sstFilePaths.add(file.toPath()); } return sstFilePaths; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java index 4e4a85aaf66..a3469a24306 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java @@ -28,12 +28,10 @@ import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.state.ArrayListSerializer; import org.apache.flink.runtime.state.CompositeKeySerializationUtils; +import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo; -import org.apache.flink.runtime.state.StateHandleID; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory; import org.apache.flink.util.ResourceGuard; @@ -49,8 +47,8 @@ import org.rocksdb.RocksDBException; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashMap; -import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; @@ -89,16 +87,11 @@ public class RocksIncrementalSnapshotStrategyTest { checkpointStreamFactory, closeableRegistry); - // If 3rd checkpoint's placeholderStateHandleCount > 0,it means 3rd checkpoint is + // If 3rd checkpoint's full size > checkpointed size, it means 3rd checkpoint is // incremental. - Map<StateHandleID, StreamStateHandle> sharedState3 = - incrementalRemoteKeyedStateHandle3.getSharedState(); - long placeholderStateHandleCount = - sharedState3.entrySet().stream() - .filter(e -> e.getValue() instanceof PlaceholderStreamStateHandle) - .count(); - - Assert.assertTrue(placeholderStateHandleCount > 0); + Assert.assertTrue( + incrementalRemoteKeyedStateHandle3.getStateSize() + > incrementalRemoteKeyedStateHandle3.getCheckpointedSize()); } } @@ -114,8 +107,7 @@ public class RocksIncrementalSnapshotStrategyTest { // construct RocksIncrementalSnapshotStrategy long lastCompletedCheckpointId = -1L; ResourceGuard rocksDBResourceGuard = new ResourceGuard(); - SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles = - new TreeMap<>(); + SortedMap<Long, Collection<HandleAndLocalPath>> materializedSstFiles = new TreeMap<>(); LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateHandleReuseITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateHandleReuseITCase.java index 0ce99b2f1a9..f81a7cb93a1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateHandleReuseITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StateHandleReuseITCase.java @@ -38,7 +38,7 @@ import org.junit.Test; import java.util.Map; -import static java.util.Collections.emptyMap; +import static java.util.Collections.emptyList; import static java.util.UUID.randomUUID; import static org.apache.flink.runtime.operators.lifecycle.command.TestCommand.FINISH_SOURCES; import static org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS; @@ -89,8 +89,8 @@ public class StateHandleReuseITCase extends AbstractTestBase { randomUUID(), EMPTY_KEY_GROUP_RANGE, 1L, - emptyMap(), - emptyMap(), + emptyList(), + emptyList(), new ByteStreamStateHandle("meta", new byte[] {0}), 0L);