This is an automated email from the ASF dual-hosted git repository. yuanmei pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 01560a154fb [FLINK-32071][FLIP-306] Implement the snapshot manager for merged checkpoint files in TM 01560a154fb is described below commit 01560a154fba41af4a9354da2588f78ea98089b7 Author: Zakelly <zakelly....@gmail.com> AuthorDate: Tue Jun 20 14:56:32 2023 +0800 [FLINK-32071][FLIP-306] Implement the snapshot manager for merged checkpoint files in TM --- .../filemerging/FileMergingSnapshotManager.java | 175 +++++++++++ .../FileMergingSnapshotManagerBase.java | 337 +++++++++++++++++++++ .../FileMergingSnapshotManagerBuilder.java | 64 ++++ .../checkpoint/filemerging/LogicalFile.java | 171 +++++++++++ .../checkpoint/filemerging/PhysicalFile.java | 210 +++++++++++++ ...WithinCheckpointFileMergingSnapshotManager.java | 101 ++++++ .../FileMergingSnapshotManagerTest.java | 277 +++++++++++++++++ 7 files changed, 1335 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java new file mode 100644 index 00000000000..4954d8548f3 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.TaskStateManager; +import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; + +import java.io.Closeable; + +/** + * FileMergingSnapshotManager provides an interface to manage files and meta information for + * checkpoint files with merging checkpoint files enabled. It manages the files for ONE single task + * in TM, including all subtasks of this single task that running in this TM. There is one + * FileMergingSnapshotManager for each task per task manager. + * + * <p>TODO (FLINK-32073): create output stream. + * + * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logical files. + */ +public interface FileMergingSnapshotManager extends Closeable { + + /** + * Initialize the file system, recording the checkpoint path the manager should work with. + * + * <pre> + * The layout of checkpoint directory: + * /user-defined-checkpoint-dir + * /{job-id} (checkpointBaseDir) + * | + * + --shared/ + * | + * + --subtask-1/ + * + -- merged shared state files + * + --subtask-2/ + * + -- merged shared state files + * + --taskowned/ + * + -- merged private state files + * + --chk-1/ + * + --chk-2/ + * + --chk-3/ + * </pre> + * + * <p>The reason why initializing directories in this method instead of the constructor is that + * the FileMergingSnapshotManager itself belongs to the {@link TaskStateManager}, which is + * initialized when receiving a task, while the base directories for checkpoint are created by + * {@link FsCheckpointStorageAccess} when the state backend initializing per subtask. After the + * checkpoint directories are initialized, the managed subdirectories are initialized here. + * + * <p>Note: This method may be called several times, the implementation should ensure + * idempotency, and throw {@link IllegalArgumentException} when any of the path in params change + * across function calls. + * + * @param fileSystem The filesystem to write to. + * @param checkpointBaseDir The base directory for checkpoints. + * @param sharedStateDir The directory for shared checkpoint data. + * @param taskOwnedStateDir The name of the directory for state not owned/released by the + * master, but by the TaskManagers. + * @throws IllegalArgumentException thrown if these three paths are not deterministic across + * calls. + */ + void initFileSystem( + FileSystem fileSystem, + Path checkpointBaseDir, + Path sharedStateDir, + Path taskOwnedStateDir) + throws IllegalArgumentException; + + /** + * Register a subtask and create the managed directory for shared states. + * + * @param subtaskKey the subtask key identifying a subtask. + * @see #initFileSystem for layout information. + */ + void registerSubtaskForSharedStates(SubtaskKey subtaskKey); + + /** + * Get the managed directory of the file-merging snapshot manager, created in {@link + * #initFileSystem} or {@link #registerSubtaskForSharedStates}. + * + * @param subtaskKey the subtask key identifying the subtask. + * @param scope the checkpoint scope. + * @return the managed directory for one subtask in specified checkpoint scope. + */ + Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope); + + /** + * A key identifies a subtask. A subtask can be identified by the operator id, subtask index and + * the parallelism. Note that this key should be consistent across job attempts. + */ + final class SubtaskKey { + final String operatorIDString; + final int subtaskIndex; + final int parallelism; + + /** + * The cached hash code. Since instances of SubtaskKey are used in HashMap as keys, cached + * hashcode may help improve the performance. + */ + final int hashCode; + + public SubtaskKey(OperatorID operatorID, TaskInfo taskInfo) { + this( + operatorID.toHexString(), + taskInfo.getIndexOfThisSubtask(), + taskInfo.getNumberOfParallelSubtasks()); + } + + SubtaskKey(String operatorIDString, int subtaskIndex, int parallelism) { + this.operatorIDString = operatorIDString; + this.subtaskIndex = subtaskIndex; + this.parallelism = parallelism; + int hash = operatorIDString.hashCode(); + hash = 31 * hash + subtaskIndex; + hash = 31 * hash + parallelism; + this.hashCode = hash; + } + + /** + * Generate an unique managed directory name for one subtask. + * + * @return the managed directory name. + */ + public String getManagedDirName() { + return String.format("%s_%d_%d_", operatorIDString, subtaskIndex, parallelism) + .replaceAll("[^a-zA-Z0-9\\-]", "_"); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + SubtaskKey that = (SubtaskKey) o; + + return hashCode == that.hashCode + && subtaskIndex == that.subtaskIndex + && parallelism == that.parallelism + && operatorIDString.equals(that.operatorIDString); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public String toString() { + return String.format("%s(%d/%d)", operatorIDString, subtaskIndex, parallelism); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java new file mode 100644 index 00000000000..e775c0fed64 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.core.fs.EntropyInjector; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.OutputStreamAndPath; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; + +import static org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile.PhysicalFileDeleter; + +/** Base implementation of {@link FileMergingSnapshotManager}. */ +public abstract class FileMergingSnapshotManagerBase implements FileMergingSnapshotManager { + + private static final Logger LOG = LoggerFactory.getLogger(FileMergingSnapshotManager.class); + + /** The identifier of this manager. */ + private final String id; + + /** The executor for I/O operations in this manager. */ + protected final Executor ioExecutor; + + /** The {@link FileSystem} that this manager works on. */ + protected FileSystem fs; + + // checkpoint directories + protected Path checkpointDir; + protected Path sharedStateDir; + protected Path taskOwnedStateDir; + + /** + * The file system should only be initialized once. + * + * @see FileMergingSnapshotManager#initFileSystem for the reason why a throttle is needed. + */ + private boolean fileSystemInitiated = false; + + /** + * File-system dependent value. Mark whether the file system this manager running on need sync + * for visibility. If true, DO a file sync after writing each segment . + */ + protected boolean shouldSyncAfterClosingLogicalFile; + + protected PhysicalFileDeleter physicalFileDeleter = this::deletePhysicalFile; + + /** + * Currently the shared state files are merged within each subtask, files are split by different + * directories. + */ + private final Map<SubtaskKey, Path> managedSharedStateDir = new ConcurrentHashMap<>(); + + /** + * The private state files are merged across subtasks, there is only one directory for + * merged-files within one TM per job. + */ + protected Path managedExclusiveStateDir; + + public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) { + this.id = id; + this.ioExecutor = ioExecutor; + } + + @Override + public void initFileSystem( + FileSystem fileSystem, + Path checkpointBaseDir, + Path sharedStateDir, + Path taskOwnedStateDir) + throws IllegalArgumentException { + if (fileSystemInitiated) { + Preconditions.checkArgument( + checkpointBaseDir.equals(this.checkpointDir), + "The checkpoint base dir is not deterministic across subtasks."); + Preconditions.checkArgument( + sharedStateDir.equals(this.sharedStateDir), + "The shared checkpoint dir is not deterministic across subtasks."); + Preconditions.checkArgument( + taskOwnedStateDir.equals(this.taskOwnedStateDir), + "The task-owned checkpoint dir is not deterministic across subtasks."); + return; + } + this.fs = fileSystem; + this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir); + this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir); + this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir); + this.fileSystemInitiated = true; + this.shouldSyncAfterClosingLogicalFile = shouldSyncAfterClosingLogicalFile(fileSystem); + // Initialize the managed exclusive path using id as the child path name. + // Currently, we use the task-owned directory to place the merged private state. According + // to the FLIP-306, we later consider move these files to the new introduced + // task-manager-owned directory. + Path managedExclusivePath = new Path(taskOwnedStateDir, id); + createManagedDirectory(managedExclusivePath); + this.managedExclusiveStateDir = managedExclusivePath; + } + + @Override + public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) { + String managedDirName = subtaskKey.getManagedDirName(); + Path managedPath = new Path(sharedStateDir, managedDirName); + if (!managedSharedStateDir.containsKey(subtaskKey)) { + createManagedDirectory(managedPath); + managedSharedStateDir.put(subtaskKey, managedPath); + } + } + + // ------------------------------------------------------------------------ + // logical & physical file + // ------------------------------------------------------------------------ + + /** + * Create a logical file on a physical file. + * + * @param physicalFile the underlying physical file. + * @param startOffset the offset of the physical file that the logical file start from. + * @param length the length of the logical file. + * @param subtaskKey the id of the subtask that the logical file belongs to. + * @return the created logical file. + */ + protected LogicalFile createLogicalFile( + @Nonnull PhysicalFile physicalFile, + int startOffset, + int length, + @Nonnull SubtaskKey subtaskKey) { + LogicalFileId fileID = LogicalFileId.generateRandomId(); + return new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey); + } + + /** + * Create a physical file in right location (managed directory), which is specified by scope of + * this checkpoint and current subtask. + * + * @param subtaskKey the {@link SubtaskKey} of current subtask. + * @param scope the scope of the checkpoint. + * @return the created physical file. + * @throws IOException if anything goes wrong with file system. + */ + @Nonnull + protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, CheckpointedStateScope scope) + throws IOException { + PhysicalFile result; + Exception latestException = null; + + Path dirPath = getManagedDir(subtaskKey, scope); + + if (dirPath == null) { + throw new IOException( + "Could not get " + + scope + + " path for subtask " + + subtaskKey + + ", the directory may have not been created."); + } + + for (int attempt = 0; attempt < 10; attempt++) { + try { + OutputStreamAndPath streamAndPath = + EntropyInjector.createEntropyAware( + fs, + generatePhysicalFilePath(dirPath), + FileSystem.WriteMode.NO_OVERWRITE); + FSDataOutputStream outputStream = streamAndPath.stream(); + Path filePath = streamAndPath.path(); + result = new PhysicalFile(outputStream, filePath, this.physicalFileDeleter, scope); + updateFileCreationMetrics(filePath); + return result; + } catch (Exception e) { + latestException = e; + } + } + + throw new IOException( + "Could not open output stream for state file merging.", latestException); + } + + private void updateFileCreationMetrics(Path path) { + // TODO: FLINK-32091 add io metrics + LOG.debug("Create a new physical file {} for checkpoint file merging.", path); + } + + /** + * Generate a file path for a physical file. + * + * @param dirPath the parent directory path for the physical file. + * @return the generated file path for a physical file. + */ + protected Path generatePhysicalFilePath(Path dirPath) { + // this must be called after initFileSystem() is called + // so the checkpoint directories must be not null if we reach here + final String fileName = UUID.randomUUID().toString(); + return new Path(dirPath, fileName); + } + + /** + * Delete a physical file by given file path. Use the io executor to do the deletion. + * + * @param filePath the given file path to delete. + */ + protected final void deletePhysicalFile(Path filePath) { + ioExecutor.execute( + () -> { + try { + fs.delete(filePath, false); + LOG.debug("Physical file deleted: {}.", filePath); + } catch (IOException e) { + LOG.warn("Fail to delete file: {}", filePath); + } + }); + } + + // ------------------------------------------------------------------------ + // abstract methods + // ------------------------------------------------------------------------ + + /** + * Get a reused physical file or create one. This will be called in checkpoint output stream + * creation logic. + * + * <p>TODO (FLINK-32073): Implement a CheckpointStreamFactory for file-merging that uses this + * method to create or reuse physical files. + * + * <p>Basic logic of file reusing: whenever a physical file is needed, this method is called + * with necessary information provided for acquiring a file. The file will not be reused until + * it is written and returned to the reused pool by calling {@link + * #returnPhysicalFileForNextReuse}. + * + * @param subtaskKey the subtask key for the caller + * @param checkpointId the checkpoint id + * @param scope checkpoint scope + * @return the requested physical file. + * @throws IOException thrown if anything goes wrong with file system. + */ + @Nonnull + protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint( + SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope) + throws IOException; + + /** + * Try to return an existing physical file to the manager for next reuse. If this physical file + * is no longer needed (for reusing), it will be closed. + * + * <p>Basic logic of file reusing, see {@link #getOrCreatePhysicalFileForCheckpoint}. + * + * @param subtaskKey the subtask key for the caller + * @param checkpointId in which checkpoint this physical file is requested. + * @param physicalFile the returning checkpoint + * @throws IOException thrown if anything goes wrong with file system. + * @see #getOrCreatePhysicalFileForCheckpoint(SubtaskKey, long, CheckpointedStateScope) + */ + protected abstract void returnPhysicalFileForNextReuse( + SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) throws IOException; + + // ------------------------------------------------------------------------ + // file system + // ------------------------------------------------------------------------ + + @Override + public Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope) { + if (scope.equals(CheckpointedStateScope.SHARED)) { + return managedSharedStateDir.get(subtaskKey); + } else { + return managedExclusiveStateDir; + } + } + + static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) { + // Currently, we do file sync regardless of the file system. + // TODO: Determine whether do file sync more wisely. Add an interface to FileSystem if + // needed. + return true; + } + + // ------------------------------------------------------------------------ + // utilities + // ------------------------------------------------------------------------ + + private void createManagedDirectory(Path managedPath) { + try { + FileStatus fileStatus = null; + try { + fileStatus = fs.getFileStatus(managedPath); + } catch (FileNotFoundException e) { + // expected exception when the path not exist, and we ignore it. + } + if (fileStatus == null) { + fs.mkdirs(managedPath); + LOG.info("Created a directory {} for checkpoint file-merging.", managedPath); + } else if (fileStatus.isDir()) { + LOG.info("Reusing previous directory {} for checkpoint file-merging.", managedPath); + } else { + throw new FlinkRuntimeException( + "The managed path " + + managedPath + + " for file-merging is occupied by another file. Cannot create directory."); + } + } catch (IOException e) { + throw new FlinkRuntimeException( + "Cannot create directory " + managedPath + " for file-merging ", e); + } + } + + @Override + public void close() throws IOException {} +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java new file mode 100644 index 00000000000..f07669b094b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import javax.annotation.Nullable; + +import java.util.concurrent.Executor; + +/** A builder that builds the {@link FileMergingSnapshotManager}. */ +public class FileMergingSnapshotManagerBuilder { + + /** The id for identify a {@link FileMergingSnapshotManager}. */ + private final String id; + + @Nullable private Executor ioExecutor = null; + + /** + * Initialize the builder. + * + * @param id the id of the manager. + */ + public FileMergingSnapshotManagerBuilder(String id) { + this.id = id; + } + + /** + * Set the executor for io operation in manager. If null(default), all io operation will be + * executed synchronously. + */ + public FileMergingSnapshotManagerBuilder setIOExecutor(@Nullable Executor ioExecutor) { + this.ioExecutor = ioExecutor; + return this; + } + + /** + * Create file-merging snapshot manager based on configuration. + * + * <p>TODO (FLINK-32072): Create manager during the initialization of task manager services. + * + * <p>TODO (FLINK-32074): Support another type of FileMergingSnapshotManager that merges files + * across different checkpoints. + * + * @return the created manager. + */ + public FileMergingSnapshotManager build() { + return new WithinCheckpointFileMergingSnapshotManager( + id, ioExecutor == null ? Runnable::run : ioExecutor); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java new file mode 100644 index 00000000000..1a113d6b4b8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.StringBasedID; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.UUID; + +import static org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; + +/** + * An abstraction of logical files in file-merging checkpoints. It stands for a data segment, that + * is to say a single file before file-merging. + */ +public class LogicalFile { + + /** ID for {@link LogicalFile}. It should be unique for each file. */ + public static class LogicalFileId extends StringBasedID { + + public LogicalFileId(String keyString) { + super(keyString); + } + + public Path getFilePath() { + return new Path(getKeyString()); + } + + public static LogicalFileId generateRandomId() { + return new LogicalFileId(UUID.randomUUID().toString()); + } + } + + /** ID for this file. */ + LogicalFileId fileId; + + /** + * The id of the last checkpoint that use this logical file. This acts as a watermark + * determining whether this logical file could be removed. + * + * @see #discardWithCheckpointId(long) + * @see #advanceLastCheckpointId(long) + */ + private long lastUsedCheckpointID = -1L; + + /** Whether this logical file is removed by checkpoint subsumption/abortion. */ + boolean discarded = false; + + /** The physical file where this logical file is stored. This should never be null. */ + @Nonnull private final PhysicalFile physicalFile; + + /** The offset of the physical file that this logical file start from. */ + private final int startOffset; + + /** The length of this logical file. */ + private final int length; + + /** The id of the subtask that this logical file belongs to. */ + @Nonnull private final SubtaskKey subtaskKey; + + public LogicalFile( + LogicalFileId fileId, + @Nonnull PhysicalFile physicalFile, + int startOffset, + int length, + @Nonnull SubtaskKey subtaskKey) { + this.fileId = fileId; + this.physicalFile = physicalFile; + this.startOffset = startOffset; + this.length = length; + this.subtaskKey = subtaskKey; + physicalFile.incRefCount(); + } + + public LogicalFileId getFileId() { + return fileId; + } + + /** + * A logical file may share across checkpoints (especially for shared state). When this logical + * file is used/reused by a checkpoint, update the last checkpoint id that uses this logical + * file. + * + * @param checkpointId the checkpoint that uses this logical file. + */ + public void advanceLastCheckpointId(long checkpointId) { + if (checkpointId > lastUsedCheckpointID) { + this.lastUsedCheckpointID = checkpointId; + } + } + + /** + * When a checkpoint that uses this logical file is subsumed or aborted, discard this logical + * file. If this file is used by a later checkpoint, the file should not be discarded. Note that + * the removal of logical may cause the deletion of physical file. + * + * @param checkpointId the checkpoint that is notified subsumed or aborted. + * @throws IOException if anything goes wrong with file system. + */ + public void discardWithCheckpointId(long checkpointId) throws IOException { + if (!discarded && checkpointId >= lastUsedCheckpointID) { + physicalFile.decRefCount(); + discarded = true; + } + } + + public long getLastUsedCheckpointID() { + return lastUsedCheckpointID; + } + + @Nonnull + public PhysicalFile getPhysicalFile() { + return physicalFile; + } + + public int getStartOffset() { + return startOffset; + } + + public int getLength() { + return length; + } + + @Nonnull + public SubtaskKey getSubtaskKey() { + return subtaskKey; + } + + @VisibleForTesting + public boolean isDiscarded() { + return discarded; + } + + @Override + public int hashCode() { + return fileId.hashCode(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LogicalFile that = (LogicalFile) o; + return fileId.equals(that.fileId); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java new file mode 100644 index 00000000000..2709aaabbd7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +/** An abstraction of physical files in file-merging checkpoints. */ +public class PhysicalFile { + + private static final Logger LOG = LoggerFactory.getLogger(PhysicalFile.class); + + /** Functional interface to delete the physical file. */ + @FunctionalInterface + public interface PhysicalFileDeleter { + /** Delete the file. */ + void perform(Path filePath) throws IOException; + } + + /** + * Output stream to the file, which keeps open for writing. It can be null if the file is + * closed. + */ + @Nullable private FSDataOutputStream outputStream; + + /** Reference count from the logical files. */ + private final AtomicInteger logicalFileRefCount; + + /** The size of this physical file. */ + private final AtomicLong size; + + /** + * Deleter that will be called when delete this physical file. If null, do not delete this + * physical file. + */ + @Nullable private final PhysicalFileDeleter deleter; + + private final Path filePath; + + private final CheckpointedStateScope scope; + + /** + * If a physical file is closed, it means no more file segments will be written to the physical + * file, and it can be deleted once its logicalFileRefCount decreases to 0. + */ + private boolean closed; + + /** + * A file can be deleted if: 1. It is closed, and 2. No more {@link LogicalFile}s have reference + * on it. + */ + private boolean deleted = false; + + public PhysicalFile( + @Nullable FSDataOutputStream outputStream, + Path filePath, + @Nullable PhysicalFileDeleter deleter, + CheckpointedStateScope scope) { + this.filePath = filePath; + this.outputStream = outputStream; + this.closed = outputStream == null; + this.deleter = deleter; + this.scope = scope; + this.size = new AtomicLong(0); + this.logicalFileRefCount = new AtomicInteger(0); + } + + @Nullable + public FSDataOutputStream getOutputStream() { + return outputStream; + } + + void incRefCount() { + int newValue = this.logicalFileRefCount.incrementAndGet(); + LOG.trace( + "Increase the reference count of physical file: {} by 1. New value is: {}.", + this.filePath, + newValue); + } + + void decRefCount() throws IOException { + Preconditions.checkArgument(logicalFileRefCount.get() > 0); + int newValue = this.logicalFileRefCount.decrementAndGet(); + LOG.trace( + "Decrease the reference count of physical file: {} by 1. New value is: {}. ", + this.filePath, + newValue); + deleteIfNecessary(); + } + + /** + * Delete this physical file if there is no reference count from logical files (all discarded), + * and this physical file is closed (no further writing on it). + * + * @throws IOException if anything goes wrong with file system. + */ + public void deleteIfNecessary() throws IOException { + synchronized (this) { + if (!isOpen() && !deleted && this.logicalFileRefCount.get() <= 0) { + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + LOG.warn("Fail to close output stream when deleting file: {}", filePath); + } + } + if (deleter != null) { + deleter.perform(filePath); + } + this.deleted = true; + } + } + } + + void incSize(long delta) { + this.size.addAndGet(delta); + } + + long getSize() { + return size.get(); + } + + @VisibleForTesting + int getRefCount() { + return logicalFileRefCount.get(); + } + + public void close() throws IOException { + innerClose(); + deleteIfNecessary(); + } + + /** + * Close the physical file, stop reusing. + * + * @throws IOException if anything goes wrong with file system. + */ + private void innerClose() throws IOException { + closed = true; + if (outputStream != null) { + outputStream.close(); + outputStream = null; + } + } + + /** @return whether this physical file is still open for writing. */ + public boolean isOpen() { + return !closed && outputStream != null; + } + + public boolean isDeleted() { + return deleted; + } + + public Path getFilePath() { + return filePath; + } + + public CheckpointedStateScope getScope() { + return scope; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + PhysicalFile that = (PhysicalFile) o; + return filePath.equals(that.filePath); + } + + @Override + public String toString() { + return String.format( + "Physical File: [%s], closed: %s, logicalFileRefCount: %d", + filePath, closed, logicalFileRefCount.get()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java new file mode 100644 index 00000000000..e8e79f0c052 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.runtime.state.CheckpointedStateScope; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; + +/** A {@link FileMergingSnapshotManager} that merging files within a checkpoint. */ +public class WithinCheckpointFileMergingSnapshotManager extends FileMergingSnapshotManagerBase { + + /** A dummy subtask key to reuse files among subtasks for private states. */ + private static final SubtaskKey DUMMY_SUBTASK_KEY = new SubtaskKey("dummy", -1, -1); + + /** + * For WITHIN_BOUNDARY mode, physical files are NOT shared among multiple checkpoints. This map + * contains all physical files that are still writable and not occupied by a writer. The key of + * this map consist of checkpoint id, subtask key, and checkpoint scope, which collectively + * determine the physical file to be reused. + */ + private final Map<Tuple3<Long, SubtaskKey, CheckpointedStateScope>, PhysicalFile> + writablePhysicalFilePool; + + public WithinCheckpointFileMergingSnapshotManager(String id, Executor ioExecutor) { + // currently there is no file size limit For WITHIN_BOUNDARY mode + super(id, ioExecutor); + writablePhysicalFilePool = new HashMap<>(); + } + + @Override + @Nonnull + protected PhysicalFile getOrCreatePhysicalFileForCheckpoint( + SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope scope) + throws IOException { + // TODO: FLINK-32076 will add a file pool for each subtask key. + Tuple3<Long, SubtaskKey, CheckpointedStateScope> fileKey = + Tuple3.of( + checkpointId, + scope == CheckpointedStateScope.SHARED ? subtaskKey : DUMMY_SUBTASK_KEY, + scope); + PhysicalFile file; + synchronized (writablePhysicalFilePool) { + file = writablePhysicalFilePool.remove(fileKey); + if (file == null) { + file = createPhysicalFile(subtaskKey, scope); + } + } + return file; + } + + @Override + protected void returnPhysicalFileForNextReuse( + SubtaskKey subtaskKey, long checkpointId, PhysicalFile physicalFile) + throws IOException { + // TODO: FLINK-32076 will add a file pool for reusing. + CheckpointedStateScope scope = physicalFile.getScope(); + Tuple3<Long, SubtaskKey, CheckpointedStateScope> fileKey = + Tuple3.of( + checkpointId, + scope == CheckpointedStateScope.SHARED ? subtaskKey : DUMMY_SUBTASK_KEY, + scope); + PhysicalFile current; + synchronized (writablePhysicalFilePool) { + current = writablePhysicalFilePool.putIfAbsent(fileKey, physicalFile); + } + // TODO: We sync the file when return to the reuse pool for safety. Actually it could be + // optimized after FLINK-32075. + if (shouldSyncAfterClosingLogicalFile) { + FSDataOutputStream os = physicalFile.getOutputStream(); + if (os != null) { + os.sync(); + } + } + if (current != physicalFile) { + physicalFile.close(); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java new file mode 100644 index 00000000000..c8299575ae8 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint.filemerging; + +import org.apache.flink.api.common.TaskInfo; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.local.LocalFileSystem; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link FileMergingSnapshotManager}. */ +public class FileMergingSnapshotManagerTest { + + private final String tmId = "Testing"; + + private final OperatorID operatorID = new OperatorID(289347923L, 75893479L); + + private SubtaskKey subtaskKey1; + private SubtaskKey subtaskKey2; + + private Path checkpointBaseDir; + + @BeforeEach + public void setup(@TempDir java.nio.file.Path tempFolder) { + // use simplified job ids for the tests + long jobId = 1; + subtaskKey1 = new SubtaskKey(operatorID, new TaskInfo("TestingTask", 128, 0, 128, 3)); + subtaskKey2 = new SubtaskKey(operatorID, new TaskInfo("TestingTask", 128, 1, 128, 3)); + checkpointBaseDir = new Path(tempFolder.toString(), String.valueOf(jobId)); + } + + @Test + public void testCreateFileMergingSnapshotManager() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + assertThat(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)) + .isEqualTo( + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess + .CHECKPOINT_TASK_OWNED_STATE_DIR + + "/" + + tmId)); + assertThat(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)) + .isEqualTo( + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR + + "/" + + subtaskKey1.getManagedDirName())); + } + } + + @Test + public void testCreateAndReuseFiles() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + // firstly, we try shared state. + PhysicalFile file1 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file1.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + // allocate another + PhysicalFile file2 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file2.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file2).isNotEqualTo(file1); + + // return for reuse + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1); + + // allocate for another subtask + PhysicalFile file3 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey2, 0, CheckpointedStateScope.SHARED); + assertThat(file3.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED)); + assertThat(file3).isNotEqualTo(file1); + + // allocate for another checkpoint + PhysicalFile file4 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.SHARED); + assertThat(file4.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file4).isNotEqualTo(file1); + + // allocate for this checkpoint + PhysicalFile file5 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(file5.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(file5).isEqualTo(file1); + + // Secondly, we try private state + PhysicalFile file6 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file6.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + + // allocate another + PhysicalFile file7 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file7.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file7).isNotEqualTo(file6); + + // return for reuse + fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file6); + + // allocate for another checkpoint + PhysicalFile file8 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE); + assertThat(file8.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file8).isNotEqualTo(file6); + + // allocate for this checkpoint but another subtask + PhysicalFile file9 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE); + assertThat(file9.getFilePath().getParent()) + .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)); + assertThat(file9).isEqualTo(file6); + + assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)) + .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + } + } + + @Test + public void testRefCountBetweenLogicalAndPhysicalFiles() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + + PhysicalFile physicalFile1 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(physicalFile1.isOpen()).isTrue(); + + LogicalFile logicalFile1 = fmsm.createLogicalFile(physicalFile1, 0, 10, subtaskKey1); + assertThat(logicalFile1.getSubtaskKey()).isEqualTo(subtaskKey1); + assertThat(logicalFile1.getPhysicalFile()).isEqualTo(physicalFile1); + assertThat(logicalFile1.getStartOffset()).isEqualTo(0); + assertThat(logicalFile1.getLength()).isEqualTo(10); + assertThat(physicalFile1.getRefCount()).isEqualTo(1); + + assertThat(logicalFile1.isDiscarded()).isFalse(); + logicalFile1.advanceLastCheckpointId(2); + assertThat(logicalFile1.getLastUsedCheckpointID()).isEqualTo(2); + logicalFile1.advanceLastCheckpointId(1); + assertThat(logicalFile1.getLastUsedCheckpointID()).isEqualTo(2); + logicalFile1.discardWithCheckpointId(1); + assertThat(logicalFile1.isDiscarded()).isFalse(); + logicalFile1.discardWithCheckpointId(2); + assertThat(logicalFile1.isDiscarded()).isTrue(); + + // the stream is still open for reuse + assertThat(physicalFile1.isOpen()).isTrue(); + assertThat(physicalFile1.isDeleted()).isFalse(); + assertThat(physicalFile1.getRefCount()).isEqualTo(0); + + physicalFile1.close(); + assertThat(physicalFile1.isOpen()).isFalse(); + assertThat(physicalFile1.isDeleted()).isTrue(); + + // try close physical file but not deleted + PhysicalFile physicalFile2 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + LogicalFile logicalFile2 = fmsm.createLogicalFile(physicalFile2, 0, 10, subtaskKey1); + assertThat(logicalFile2.getPhysicalFile()).isEqualTo(physicalFile2); + assertThat(logicalFile2.getStartOffset()).isEqualTo(0); + assertThat(logicalFile2.getLength()).isEqualTo(10); + assertThat(physicalFile2.getRefCount()).isEqualTo(1); + logicalFile2.advanceLastCheckpointId(2); + + assertThat(physicalFile2.isOpen()).isTrue(); + assertThat(physicalFile2.isDeleted()).isFalse(); + physicalFile2.close(); + assertThat(physicalFile2.isOpen()).isFalse(); + assertThat(physicalFile2.isDeleted()).isFalse(); + assertThat(physicalFile2.getRefCount()).isEqualTo(1); + + logicalFile2.discardWithCheckpointId(2); + assertThat(logicalFile2.isDiscarded()).isTrue(); + assertThat(physicalFile2.isDeleted()).isTrue(); + assertThat(physicalFile2.getRefCount()).isEqualTo(0); + } + } + + @Test + public void testSizeStatsInPhysicalFile() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + PhysicalFile physicalFile = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + + assertThat(physicalFile.getSize()).isEqualTo(0); + physicalFile.incSize(123); + assertThat(physicalFile.getSize()).isEqualTo(123); + physicalFile.incSize(456); + assertThat(physicalFile.getSize()).isEqualTo(123 + 456); + } + } + + private FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) + throws IOException { + FileSystem fs = LocalFileSystem.getSharedInstance(); + Path sharedStateDir = + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR); + Path taskOwnedStateDir = + new Path( + checkpointBaseDir, + AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR); + if (!fs.exists(checkpointBaseDir)) { + fs.mkdirs(checkpointBaseDir); + fs.mkdirs(sharedStateDir); + fs.mkdirs(taskOwnedStateDir); + } + FileMergingSnapshotManager fmsm = new FileMergingSnapshotManagerBuilder(tmId).build(); + fmsm.initFileSystem( + LocalFileSystem.getSharedInstance(), + checkpointBaseDir, + sharedStateDir, + taskOwnedStateDir); + assertThat(fmsm).isNotNull(); + return fmsm; + } +}