This is an automated email from the ASF dual-hosted git repository. leiyanfei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 05b27be4112 [FLINK-34668][checkpoint] Report operator state handle of file merging directory to JM 05b27be4112 is described below commit 05b27be4112e1f5ca4c5614b1714a202c13343b6 Author: fredia <fredia...@gmail.com> AuthorDate: Mon Mar 18 14:05:52 2024 +0800 [FLINK-34668][checkpoint] Report operator state handle of file merging directory to JM --- .../filemerging/FileMergingSnapshotManager.java | 12 ++ .../FileMergingSnapshotManagerBase.java | 32 +++++ ...efaultOperatorStateBackendSnapshotStrategy.java | 27 +++- .../filemerging/DirectoryStreamStateHandle.java | 90 ++++++++++++ .../EmptyFileMergingOperatorStreamStateHandle.java | 64 +++++++++ .../filemerging/EmptySegmentFileStateHandle.java | 46 ++++++ .../FileMergingOperatorStreamStateHandle.java | 154 +++++++++++++++++++++ .../filemerging/SegmentFileStateHandle.java | 5 +- .../FileMergingCheckpointStateOutputStream.java | 2 +- .../FsMergingCheckpointStorageLocation.java | 11 ++ ...ssCheckpointFileMergingSnapshotManagerTest.java | 1 + .../FileMergingSnapshotManagerTestBase.java | 1 + ...inCheckpointFileMergingSnapshotManagerTest.java | 1 + .../runtime/state/OperatorStateBackendTest.java | 85 ++++++++++++ .../runtime/state/SharedStateRegistryTest.java | 86 ++++++++++++ ...FileMergingCheckpointStateOutputStreamTest.java | 2 +- .../FsMergingCheckpointStorageLocationTest.java | 2 +- 17 files changed, 614 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index cc54854a7a3..afc5eb618dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; @@ -117,6 +118,17 @@ public interface FileMergingSnapshotManager extends Closeable { */ Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope); + /** + * Get the {@link DirectoryStreamStateHandle} of the managed directory, created in {@link + * #initFileSystem} or {@link #registerSubtaskForSharedStates}. + * + * @param subtaskKey the subtask key identifying the subtask. + * @param scope the checkpoint scope. + * @return the {@link DirectoryStreamStateHandle} for one subtask in specified checkpoint scope. + */ + DirectoryStreamStateHandle getManagedDirStateHandle( + SubtaskKey subtaskKey, CheckpointedStateScope scope); + /** * Notifies the manager that the checkpoint with the given {@code checkpointId} completed and * was committed. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index 3dc9c788561..d46edcd235e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -27,6 +27,8 @@ import org.apache.flink.core.fs.OutputStreamAndPath; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import javax.annotation.concurrent.GuardedBy; +import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.util.HashSet; @@ -105,12 +108,24 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps */ private final Map<SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap<>(); + /** + * The {@link DirectoryStreamStateHandle} for shared state directories, one for each subtask. + */ + private final Map<SubtaskKey, DirectoryStreamStateHandle> managedSharedStateDirHandles = + new ConcurrentHashMap<>(); + /** * The private state files are merged across subtasks, there is only one directory for * merged-files within one TM per job. */ protected Path managedExclusiveStateDir; + /** + * The {@link DirectoryStreamStateHandle} for private state directory, one for each task + * manager. + */ + protected DirectoryStreamStateHandle managedExclusiveStateDirHandle; + public FileMergingSnapshotManagerBase( String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) { this.id = id; @@ -152,6 +167,9 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps Path managedExclusivePath = new Path(taskOwnedStateDir, id); createManagedDirectory(managedExclusivePath); this.managedExclusiveStateDir = managedExclusivePath; + this.managedExclusiveStateDirHandle = + DirectoryStreamStateHandle.forPathWithZeroSize( + new File(managedExclusivePath.getPath()).toPath()); this.writeBufferSize = writeBufferSize; } @@ -162,6 +180,10 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps if (!managedSharedStateDir.containsKey(subtaskKey)) { createManagedDirectory(managedPath); managedSharedStateDir.put(subtaskKey, managedPath); + managedSharedStateDirHandles.put( + subtaskKey, + DirectoryStreamStateHandle.forPathWithZeroSize( + new File(managedPath.getPath()).toPath())); } } @@ -477,6 +499,16 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps } } + @Override + public DirectoryStreamStateHandle getManagedDirStateHandle( + SubtaskKey subtaskKey, CheckpointedStateScope scope) { + if (scope.equals(CheckpointedStateScope.SHARED)) { + return managedSharedStateDirHandles.get(subtaskKey); + } else { + return managedExclusiveStateDirHandle; + } + } + static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) { // Currently, we do file sync regardless of the file system. // TODO: Determine whether do file sync more wisely. Add an interface to FileSystem if diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java index d81f7b3a1c0..793c8f3e9b3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java @@ -21,6 +21,9 @@ package org.apache.flink.runtime.state; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle; +import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation; import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot; import org.apache.flink.util.CollectionUtil; @@ -121,7 +124,17 @@ class DefaultOperatorStateBackendSnapshotStrategy if (registeredBroadcastStatesDeepCopies.isEmpty() && registeredOperatorStatesDeepCopies.isEmpty()) { - return snapshotCloseableRegistry -> SnapshotResult.empty(); + if (streamFactory instanceof FsMergingCheckpointStorageLocation) { + FsMergingCheckpointStorageLocation location = + (FsMergingCheckpointStorageLocation) streamFactory; + return snapshotCloseableRegistry -> + SnapshotResult.of( + EmptyFileMergingOperatorStreamStateHandle.create( + location.getExclusiveStateHandle(), + location.getSharedStateHandle())); + } else { + return snapshotCloseableRegistry -> SnapshotResult.empty(); + } } return (snapshotCloseableRegistry) -> { @@ -204,7 +217,17 @@ class DefaultOperatorStateBackendSnapshotStrategy if (snapshotCloseableRegistry.unregisterCloseable(localOut)) { StreamStateHandle stateHandle = localOut.closeAndGetHandle(); if (stateHandle != null) { - retValue = new OperatorStreamStateHandle(writtenStatesMetaData, stateHandle); + retValue = + streamFactory instanceof FsMergingCheckpointStorageLocation + ? new FileMergingOperatorStreamStateHandle( + ((FsMergingCheckpointStorageLocation) streamFactory) + .getExclusiveStateHandle(), + ((FsMergingCheckpointStorageLocation) streamFactory) + .getSharedStateHandle(), + writtenStatesMetaData, + stateHandle) + : new OperatorStreamStateHandle( + writtenStatesMetaData, stateHandle); } return SnapshotResult.of(retValue); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java new file mode 100644 index 00000000000..dacebae4c10 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filemerging; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.DirectoryStateHandle; +import org.apache.flink.runtime.state.PhysicalStateHandleID; +import org.apache.flink.runtime.state.SharedStateRegistryKey; +import org.apache.flink.runtime.state.StreamStateHandle; + +import javax.annotation.Nonnull; + +import java.nio.file.Path; +import java.util.Optional; + +/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */ +public class DirectoryStreamStateHandle extends DirectoryStateHandle implements StreamStateHandle { + + private static final long serialVersionUID = 1L; + + public DirectoryStreamStateHandle(@Nonnull Path directory, long directorySize) { + super(directory, directorySize); + } + + @Override + public FSDataInputStream openInputStream() { + throw new UnsupportedOperationException(); + } + + @Override + public Optional<byte[]> asBytesIfInMemory() { + return Optional.empty(); + } + + @Override + public PhysicalStateHandleID getStreamStateHandleID() { + return new PhysicalStateHandleID(getDirectory().toString()); + } + + public SharedStateRegistryKey createStateRegistryKey() { + return new SharedStateRegistryKey(getDirectory().toUri().toString()); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + DirectoryStreamStateHandle that = (DirectoryStreamStateHandle) o; + + return getDirectory().equals(that.getDirectory()); + } + + @Override + public String toString() { + return "DirectoryStreamStateHandle{" + "directory=" + getDirectory() + '}'; + } + + /** + * Return a {@link DirectoryStreamStateHandle} with zero size, which usually used to be + * registered to {@link org.apache.flink.runtime.state.SharedStateRegistry} to track the life + * cycle of the directory, therefore a fake size is provided. + * + * @param directory the directory. + * @return DirectoryStreamStateHandle with zero size. + */ + public static DirectoryStreamStateHandle forPathWithZeroSize(@Nonnull Path directory) { + return new DirectoryStreamStateHandle(directory, 0); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java new file mode 100644 index 00000000000..6cbc9555e6d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.state.filemerging; + +import org.apache.flink.runtime.state.OperatorStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.util.Collections; +import java.util.Map; + +/** + * An empty {@link FileMergingOperatorStreamStateHandle} that is only used as a placeholder to + * prevent file merging directory from being deleted. + */ +public class EmptyFileMergingOperatorStreamStateHandle + extends FileMergingOperatorStreamStateHandle { + + private static final long serialVersionUID = 1L; + + public EmptyFileMergingOperatorStreamStateHandle( + DirectoryStreamStateHandle taskOwnedDirHandle, + DirectoryStreamStateHandle sharedDirHandle, + Map<String, StateMetaInfo> stateNameToPartitionOffsets, + StreamStateHandle delegateStateHandle) { + super( + taskOwnedDirHandle, + sharedDirHandle, + stateNameToPartitionOffsets, + delegateStateHandle); + } + + /** + * Create an empty {@link EmptyFileMergingOperatorStreamStateHandle}. + * + * @param taskownedDirHandle the directory where operator state is stored. + * @param sharedDirHandle the directory where shared state is stored. + */ + public static EmptyFileMergingOperatorStreamStateHandle create( + DirectoryStreamStateHandle taskownedDirHandle, + DirectoryStreamStateHandle sharedDirHandle) { + final Map<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = + Collections.emptyMap(); + return new EmptyFileMergingOperatorStreamStateHandle( + taskownedDirHandle, + sharedDirHandle, + writtenStatesMetaData, + EmptySegmentFileStateHandle.INSTANCE); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java new file mode 100644 index 00000000000..44b487fd53b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filemerging; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointedStateScope; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +/** An empty {@link SegmentFileStateHandle} that is only used as a placeholder. */ +public class EmptySegmentFileStateHandle extends SegmentFileStateHandle { + private static final long serialVersionUID = 1L; + + public static final EmptySegmentFileStateHandle INSTANCE = + new EmptySegmentFileStateHandle( + new Path("empty"), 0, 0, CheckpointedStateScope.EXCLUSIVE); + + private EmptySegmentFileStateHandle( + Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) { + super(filePath, startPos, stateSize, scope); + } + + @Override + public FSDataInputStream openInputStream() throws IOException { + throw new UnsupportedEncodingException( + "Cannot open input stream from an EmptySegmentFileStateHandle."); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java new file mode 100644 index 00000000000..ada378f2d03 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filemerging; + +import org.apache.flink.runtime.state.CompositeStateHandle; +import org.apache.flink.runtime.state.OperatorStreamStateHandle; +import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.SharedStateRegistryKey; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Objects; + +/** + * A {@link OperatorStreamStateHandle} that works for file merging checkpoints. + * + * <p>Operator states are stored in `taskownd/` dir when file merging is enabled. When an operator + * state dir is not referenced by any checkpoint, {@link SharedStateRegistry} will discard it. The + * shared subtask dir of fire merging is also tracked by {@link + * FileMergingOperatorStreamStateHandle}. + * + * <p>The shared subtask dir of file merging is created when task initialization, which will be + * discarded when no checkpoint refer to it. + */ +public class FileMergingOperatorStreamStateHandle extends OperatorStreamStateHandle + implements CompositeStateHandle { + + private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(FileMergingOperatorStreamStateHandle.class); + + /** The directory handle of file merging under 'taskowed/', one for each job. */ + private final DirectoryStreamStateHandle taskOwnedDirHandle; + + /** + * The directory handle of file merging under 'shared/', one for each subtask. + * + * @see org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager the layout of + * file merging checkpoint directory. + */ + private final DirectoryStreamStateHandle sharedDirHandle; + + private transient SharedStateRegistry sharedStateRegistry; + + public FileMergingOperatorStreamStateHandle( + DirectoryStreamStateHandle taskOwnedDirHandle, + DirectoryStreamStateHandle sharedDirHandle, + Map<String, StateMetaInfo> stateNameToPartitionOffsets, + StreamStateHandle delegateStateHandle) { + super(stateNameToPartitionOffsets, delegateStateHandle); + this.taskOwnedDirHandle = taskOwnedDirHandle; + this.sharedDirHandle = sharedDirHandle; + } + + @Override + public void registerSharedStates(SharedStateRegistry stateRegistry, long checkpointId) { + Preconditions.checkState( + sharedStateRegistry != stateRegistry, + "The state handle has already registered its shared states to the given registry."); + + sharedStateRegistry = Preconditions.checkNotNull(stateRegistry); + + LOG.trace( + "Registering FileMergingOperatorStreamStateHandle for checkpoint {} from backend.", + checkpointId); + stateRegistry.registerReference( + new SharedStateRegistryKey( + getDelegateStateHandle().getStreamStateHandleID().getKeyString()), + getDelegateStateHandle(), + checkpointId); + + stateRegistry.registerReference( + taskOwnedDirHandle.createStateRegistryKey(), taskOwnedDirHandle, checkpointId); + stateRegistry.registerReference( + sharedDirHandle.createStateRegistryKey(), sharedDirHandle, checkpointId); + } + + @Override + public void discardState() throws Exception { + SharedStateRegistry registry = this.sharedStateRegistry; + final boolean isRegistered = (registry != null); + + LOG.trace( + "Discarding FileMergingOperatorStreamStateHandle (registered = {}) from backend.", + isRegistered); + + try { + getDelegateStateHandle().discardState(); + } catch (Exception e) { + LOG.warn("Could not properly discard directory state handle.", e); + } + } + + @Override + public long getCheckpointedSize() { + return getDelegateStateHandle().getStateSize(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + FileMergingOperatorStreamStateHandle that = (FileMergingOperatorStreamStateHandle) o; + + return super.equals(that) + && taskOwnedDirHandle.equals(that.taskOwnedDirHandle) + && sharedDirHandle.equals(that.sharedDirHandle); + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + Objects.hashCode(taskOwnedDirHandle); + result = 31 * result + Objects.hashCode(sharedDirHandle); + return result; + } + + @Override + public String toString() { + return "FileMergingOperatorStreamStateHandle{" + + super.toString() + + ", taskOwnedDirHandle=" + + taskOwnedDirHandle + + ", sharedDirHandle=" + + sharedDirHandle + + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SegmentFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java similarity index 96% rename from flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SegmentFileStateHandle.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java index 0dce9304f78..2a3aa2f49fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SegmentFileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java @@ -15,11 +15,12 @@ * limitations under the License. */ -package org.apache.flink.runtime.checkpoint.filemerging; +package org.apache.flink.runtime.state.filemerging; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.PhysicalStateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; @@ -138,7 +139,7 @@ public class SegmentFileStateHandle implements StreamStateHandle { SegmentFileStateHandle that = (SegmentFileStateHandle) o; - return super.equals(that) + return filePath.equals(that.filePath) && startPos == that.startPos && stateSize == that.stateSize && scope.equals(that.scope); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java index b367d1724a9..688fad8c2ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java @@ -22,8 +22,8 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; -import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.CheckpointStateOutputStream; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java index d485a3dfdfe..f367094bb8b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; import java.io.IOException; import java.util.List; @@ -84,6 +85,16 @@ public class FsMergingCheckpointStorageLocation extends FsCheckpointStorageLocat return backwardsConvertor.get(); } + public DirectoryStreamStateHandle getExclusiveStateHandle() { + return fileMergingSnapshotManager.getManagedDirStateHandle( + subtaskKey, CheckpointedStateScope.EXCLUSIVE); + } + + public DirectoryStreamStateHandle getSharedStateHandle() { + return fileMergingSnapshotManager.getManagedDirStateHandle( + subtaskKey, CheckpointedStateScope.SHARED); + } + @Override public boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope scope) throws IOException { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java index 475c13b5445..cc6c53793df 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint.filemerging; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; import org.junit.jupiter.api.Test; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java index d6553060e2e..3147e668ef1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java @@ -26,6 +26,7 @@ import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java index 4128e5e7cd3..340bb917d41 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint.filemerging; import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; import org.junit.jupiter.api.Test; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 7f70163ea29..2122303545b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -30,12 +30,21 @@ import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle; +import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; +import org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; @@ -47,6 +56,7 @@ import org.apache.flink.util.concurrent.FutureUtils; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; @@ -64,6 +74,8 @@ import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.flink.core.fs.Path.fromLocalFile; +import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.mock; @@ -418,6 +430,62 @@ class OperatorStateBackendTest { assertThat(stateHandle).isNull(); } + @Test + void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws Exception { + final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); + CloseableRegistry cancelStreamRegistry = new CloseableRegistry(); + + Environment env = createMockEnvironment(); + final OperatorStateBackend operatorStateBackend = + abstractStateBackend.createOperatorStateBackend( + new OperatorStateBackendParametersImpl( + env, "testOperator", emptyStateHandles, cancelStreamRegistry)); + + Path checkpointBaseDir = new Path(tmpFolder.toString()); + Path sharedStateDir = + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR); + Path taskOwnedStateDir = + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR); + + final FileMergingSnapshotManager.SubtaskKey subtaskKey = + new FileMergingSnapshotManager.SubtaskKey("opId", 1, 1); + LocalFileSystem fs = getSharedInstance(); + CheckpointStorageLocationReference cslReference = + AbstractFsCheckpointStorageAccess.encodePathAsReference( + fromLocalFile(fs.pathToFile(checkpointBaseDir))); + FileMergingSnapshotManager snapshotManager = + createFileMergingSnapshotManager( + checkpointBaseDir, sharedStateDir, taskOwnedStateDir, subtaskKey); + CheckpointStreamFactory streamFactory = + new FsMergingCheckpointStorageLocation( + subtaskKey, + fs, + checkpointBaseDir, + sharedStateDir, + taskOwnedStateDir, + cslReference, + 1024, + 1024, + snapshotManager, + 0); + + RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot = + operatorStateBackend.snapshot( + 0L, + 0L, + streamFactory, + CheckpointOptions.forCheckpointWithDefaultLocation()); + + SnapshotResult<OperatorStateHandle> snapshotResult = + FutureUtils.runIfNotDoneAndGet(snapshot); + OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); + assertThat(stateHandle).isInstanceOf(FileMergingOperatorStreamStateHandle.class); + } + @Test void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception { final AbstractStateBackend abstractStateBackend = new MemoryStateBackend(4096); @@ -1047,4 +1115,21 @@ class OperatorStateBackendTest { new OperatorStateBackendParametersImpl( env, "testOperator", toRestore, new CloseableRegistry())); } + + private FileMergingSnapshotManager createFileMergingSnapshotManager( + Path checkpointBaseDir, + Path sharedStateDir, + Path taskOwnedStateDir, + SubtaskKey subtaskKey) { + FileMergingSnapshotManager mgr = + new FileMergingSnapshotManagerBuilder( + "test-1", FileMergingType.MERGE_WITHIN_CHECKPOINT) + .build(); + + mgr.initFileSystem( + getSharedInstance(), checkpointBaseDir, sharedStateDir, taskOwnedStateDir, 1024); + + mgr.registerSubtaskForSharedStates(subtaskKey); + return mgr; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java index 38553a29e9e..a26830c18be 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java @@ -22,23 +22,34 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.core.execution.RestoreMode; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.checkpoint.CheckpointProperties; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl; +import org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle; +import org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle; +import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.File; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Optional; import java.util.UUID; +import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance; import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION; import static org.apache.flink.runtime.state.ChangelogTestUtils.ChangelogStateHandleWrapper; @@ -255,6 +266,81 @@ class SharedStateRegistryTest { .containsExactly(1L); } + @Test + void testFireMergingOperatorStateRegister(@TempDir File tmpFolder) throws IOException { + SharedStateRegistry sharedStateRegistry = new SharedStateRegistryImpl(); + + Path checkpointBaseDir = new Path(tmpFolder.toString()); + Path sharedStateDir = + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR); + Path taskOwnedStateDir = + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR); + final FileMergingSnapshotManager.SubtaskKey subtaskKey = + new FileMergingSnapshotManager.SubtaskKey("opId", 1, 2); + FileMergingSnapshotManager snapshotManager = + createFileMergingSnapshotManager( + checkpointBaseDir, sharedStateDir, taskOwnedStateDir); + snapshotManager.registerSubtaskForSharedStates(subtaskKey); + FileMergingOperatorStreamStateHandle handle1 = + EmptyFileMergingOperatorStreamStateHandle.create( + snapshotManager.getManagedDirStateHandle( + subtaskKey, CheckpointedStateScope.EXCLUSIVE), + snapshotManager.getManagedDirStateHandle( + subtaskKey, CheckpointedStateScope.SHARED)); + + handle1.registerSharedStates(sharedStateRegistry, 1); + LocalFileSystem fs = getSharedInstance(); + assertThat( + fs.exists( + snapshotManager.getManagedDir( + subtaskKey, CheckpointedStateScope.EXCLUSIVE))) + .isTrue(); + assertThat( + fs.exists( + snapshotManager.getManagedDir( + subtaskKey, CheckpointedStateScope.SHARED))) + .isTrue(); + sharedStateRegistry.checkpointCompleted(1); + sharedStateRegistry.unregisterUnusedState(1); + assertThat( + fs.exists( + snapshotManager.getManagedDir( + subtaskKey, CheckpointedStateScope.EXCLUSIVE))) + .isTrue(); + assertThat( + fs.exists( + snapshotManager.getManagedDir( + subtaskKey, CheckpointedStateScope.SHARED))) + .isTrue(); + sharedStateRegistry.unregisterUnusedState(2); + assertThat( + fs.exists( + snapshotManager.getManagedDir( + subtaskKey, CheckpointedStateScope.EXCLUSIVE))) + .isFalse(); + assertThat( + fs.exists( + snapshotManager.getManagedDir( + subtaskKey, CheckpointedStateScope.SHARED))) + .isFalse(); + } + + private FileMergingSnapshotManager createFileMergingSnapshotManager( + Path checkpointBaseDir, Path sharedStateDir, Path taskOwnedStateDir) { + FileMergingSnapshotManager mgr = + new FileMergingSnapshotManagerBuilder( + "test-1", FileMergingType.MERGE_WITHIN_CHECKPOINT) + .build(); + mgr.initFileSystem( + getSharedInstance(), checkpointBaseDir, sharedStateDir, taskOwnedStateDir, 1024); + + return mgr; + } + private void registerInitialCheckpoint( SharedStateRegistry sharedStateRegistry, String stateId, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java index 837bbb1efbb..fba8653bdcd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java @@ -25,9 +25,9 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.OutputStreamAndPath; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile; -import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.util.Preconditions; import org.junit.Before; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java index d7796cf7f4f..1eb0be9e2ab 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java @@ -25,9 +25,9 @@ import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType; -import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.junit.Before; import org.junit.Rule;