This is an automated email from the ASF dual-hosted git repository.

yuanmei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 01560a154fb [FLINK-32071][FLIP-306] Implement the snapshot manager for 
merged checkpoint files in TM
01560a154fb is described below

commit 01560a154fba41af4a9354da2588f78ea98089b7
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Tue Jun 20 14:56:32 2023 +0800

    [FLINK-32071][FLIP-306] Implement the snapshot manager for merged 
checkpoint files in TM
---
 .../filemerging/FileMergingSnapshotManager.java    | 175 +++++++++++
 .../FileMergingSnapshotManagerBase.java            | 337 +++++++++++++++++++++
 .../FileMergingSnapshotManagerBuilder.java         |  64 ++++
 .../checkpoint/filemerging/LogicalFile.java        | 171 +++++++++++
 .../checkpoint/filemerging/PhysicalFile.java       | 210 +++++++++++++
 ...WithinCheckpointFileMergingSnapshotManager.java | 101 ++++++
 .../FileMergingSnapshotManagerTest.java            | 277 +++++++++++++++++
 7 files changed, 1335 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
new file mode 100644
index 00000000000..4954d8548f3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
+
+import java.io.Closeable;
+
+/**
+ * FileMergingSnapshotManager provides an interface to manage files and meta 
information for
+ * checkpoint files with merging checkpoint files enabled. It manages the 
files for ONE single task
+ * in TM, including all subtasks of this single task that running in this TM. 
There is one
+ * FileMergingSnapshotManager for each task per task manager.
+ *
+ * <p>TODO (FLINK-32073): create output stream.
+ *
+ * <p>TODO (FLINK-32075): leverage checkpoint notification to delete logical 
files.
+ */
+public interface FileMergingSnapshotManager extends Closeable {
+
+    /**
+     * Initialize the file system, recording the checkpoint path the manager 
should work with.
+     *
+     * <pre>
+     * The layout of checkpoint directory:
+     * /user-defined-checkpoint-dir
+     *     /{job-id} (checkpointBaseDir)
+     *         |
+     *         + --shared/
+     *             |
+     *             + --subtask-1/
+     *                 + -- merged shared state files
+     *             + --subtask-2/
+     *                 + -- merged shared state files
+     *         + --taskowned/
+     *             + -- merged private state files
+     *         + --chk-1/
+     *         + --chk-2/
+     *         + --chk-3/
+     * </pre>
+     *
+     * <p>The reason why initializing directories in this method instead of 
the constructor is that
+     * the FileMergingSnapshotManager itself belongs to the {@link 
TaskStateManager}, which is
+     * initialized when receiving a task, while the base directories for 
checkpoint are created by
+     * {@link FsCheckpointStorageAccess} when the state backend initializing 
per subtask. After the
+     * checkpoint directories are initialized, the managed subdirectories are 
initialized here.
+     *
+     * <p>Note: This method may be called several times, the implementation 
should ensure
+     * idempotency, and throw {@link IllegalArgumentException} when any of the 
path in params change
+     * across function calls.
+     *
+     * @param fileSystem The filesystem to write to.
+     * @param checkpointBaseDir The base directory for checkpoints.
+     * @param sharedStateDir The directory for shared checkpoint data.
+     * @param taskOwnedStateDir The name of the directory for state not 
owned/released by the
+     *     master, but by the TaskManagers.
+     * @throws IllegalArgumentException thrown if these three paths are not 
deterministic across
+     *     calls.
+     */
+    void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir)
+            throws IllegalArgumentException;
+
+    /**
+     * Register a subtask and create the managed directory for shared states.
+     *
+     * @param subtaskKey the subtask key identifying a subtask.
+     * @see #initFileSystem for layout information.
+     */
+    void registerSubtaskForSharedStates(SubtaskKey subtaskKey);
+
+    /**
+     * Get the managed directory of the file-merging snapshot manager, created 
in {@link
+     * #initFileSystem} or {@link #registerSubtaskForSharedStates}.
+     *
+     * @param subtaskKey the subtask key identifying the subtask.
+     * @param scope the checkpoint scope.
+     * @return the managed directory for one subtask in specified checkpoint 
scope.
+     */
+    Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
+
+    /**
+     * A key identifies a subtask. A subtask can be identified by the operator 
id, subtask index and
+     * the parallelism. Note that this key should be consistent across job 
attempts.
+     */
+    final class SubtaskKey {
+        final String operatorIDString;
+        final int subtaskIndex;
+        final int parallelism;
+
+        /**
+         * The cached hash code. Since instances of SubtaskKey are used in 
HashMap as keys, cached
+         * hashcode may help improve the performance.
+         */
+        final int hashCode;
+
+        public SubtaskKey(OperatorID operatorID, TaskInfo taskInfo) {
+            this(
+                    operatorID.toHexString(),
+                    taskInfo.getIndexOfThisSubtask(),
+                    taskInfo.getNumberOfParallelSubtasks());
+        }
+
+        SubtaskKey(String operatorIDString, int subtaskIndex, int parallelism) 
{
+            this.operatorIDString = operatorIDString;
+            this.subtaskIndex = subtaskIndex;
+            this.parallelism = parallelism;
+            int hash = operatorIDString.hashCode();
+            hash = 31 * hash + subtaskIndex;
+            hash = 31 * hash + parallelism;
+            this.hashCode = hash;
+        }
+
+        /**
+         * Generate an unique managed directory name for one subtask.
+         *
+         * @return the managed directory name.
+         */
+        public String getManagedDirName() {
+            return String.format("%s_%d_%d_", operatorIDString, subtaskIndex, 
parallelism)
+                    .replaceAll("[^a-zA-Z0-9\\-]", "_");
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            SubtaskKey that = (SubtaskKey) o;
+
+            return hashCode == that.hashCode
+                    && subtaskIndex == that.subtaskIndex
+                    && parallelism == that.parallelism
+                    && operatorIDString.equals(that.operatorIDString);
+        }
+
+        @Override
+        public int hashCode() {
+            return hashCode;
+        }
+
+        @Override
+        public String toString() {
+            return String.format("%s(%d/%d)", operatorIDString, subtaskIndex, 
parallelism);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
new file mode 100644
index 00000000000..e775c0fed64
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.core.fs.EntropyInjector;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.OutputStreamAndPath;
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile.PhysicalFileDeleter;
+
+/** Base implementation of {@link FileMergingSnapshotManager}. */
+public abstract class FileMergingSnapshotManagerBase implements 
FileMergingSnapshotManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FileMergingSnapshotManager.class);
+
+    /** The identifier of this manager. */
+    private final String id;
+
+    /** The executor for I/O operations in this manager. */
+    protected final Executor ioExecutor;
+
+    /** The {@link FileSystem} that this manager works on. */
+    protected FileSystem fs;
+
+    // checkpoint directories
+    protected Path checkpointDir;
+    protected Path sharedStateDir;
+    protected Path taskOwnedStateDir;
+
+    /**
+     * The file system should only be initialized once.
+     *
+     * @see FileMergingSnapshotManager#initFileSystem for the reason why a 
throttle is needed.
+     */
+    private boolean fileSystemInitiated = false;
+
+    /**
+     * File-system dependent value. Mark whether the file system this manager 
running on need sync
+     * for visibility. If true, DO a file sync after writing each segment .
+     */
+    protected boolean shouldSyncAfterClosingLogicalFile;
+
+    protected PhysicalFileDeleter physicalFileDeleter = 
this::deletePhysicalFile;
+
+    /**
+     * Currently the shared state files are merged within each subtask, files 
are split by different
+     * directories.
+     */
+    private final Map<SubtaskKey, Path> managedSharedStateDir = new 
ConcurrentHashMap<>();
+
+    /**
+     * The private state files are merged across subtasks, there is only one 
directory for
+     * merged-files within one TM per job.
+     */
+    protected Path managedExclusiveStateDir;
+
+    public FileMergingSnapshotManagerBase(String id, Executor ioExecutor) {
+        this.id = id;
+        this.ioExecutor = ioExecutor;
+    }
+
+    @Override
+    public void initFileSystem(
+            FileSystem fileSystem,
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir)
+            throws IllegalArgumentException {
+        if (fileSystemInitiated) {
+            Preconditions.checkArgument(
+                    checkpointBaseDir.equals(this.checkpointDir),
+                    "The checkpoint base dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    sharedStateDir.equals(this.sharedStateDir),
+                    "The shared checkpoint dir is not deterministic across 
subtasks.");
+            Preconditions.checkArgument(
+                    taskOwnedStateDir.equals(this.taskOwnedStateDir),
+                    "The task-owned checkpoint dir is not deterministic across 
subtasks.");
+            return;
+        }
+        this.fs = fileSystem;
+        this.checkpointDir = Preconditions.checkNotNull(checkpointBaseDir);
+        this.sharedStateDir = Preconditions.checkNotNull(sharedStateDir);
+        this.taskOwnedStateDir = Preconditions.checkNotNull(taskOwnedStateDir);
+        this.fileSystemInitiated = true;
+        this.shouldSyncAfterClosingLogicalFile = 
shouldSyncAfterClosingLogicalFile(fileSystem);
+        // Initialize the managed exclusive path using id as the child path 
name.
+        // Currently, we use the task-owned directory to place the merged 
private state. According
+        // to the FLIP-306, we later consider move these files to the new 
introduced
+        // task-manager-owned directory.
+        Path managedExclusivePath = new Path(taskOwnedStateDir, id);
+        createManagedDirectory(managedExclusivePath);
+        this.managedExclusiveStateDir = managedExclusivePath;
+    }
+
+    @Override
+    public void registerSubtaskForSharedStates(SubtaskKey subtaskKey) {
+        String managedDirName = subtaskKey.getManagedDirName();
+        Path managedPath = new Path(sharedStateDir, managedDirName);
+        if (!managedSharedStateDir.containsKey(subtaskKey)) {
+            createManagedDirectory(managedPath);
+            managedSharedStateDir.put(subtaskKey, managedPath);
+        }
+    }
+
+    // ------------------------------------------------------------------------
+    //  logical & physical file
+    // ------------------------------------------------------------------------
+
+    /**
+     * Create a logical file on a physical file.
+     *
+     * @param physicalFile the underlying physical file.
+     * @param startOffset the offset of the physical file that the logical 
file start from.
+     * @param length the length of the logical file.
+     * @param subtaskKey the id of the subtask that the logical file belongs 
to.
+     * @return the created logical file.
+     */
+    protected LogicalFile createLogicalFile(
+            @Nonnull PhysicalFile physicalFile,
+            int startOffset,
+            int length,
+            @Nonnull SubtaskKey subtaskKey) {
+        LogicalFileId fileID = LogicalFileId.generateRandomId();
+        return new LogicalFile(fileID, physicalFile, startOffset, length, 
subtaskKey);
+    }
+
+    /**
+     * Create a physical file in right location (managed directory), which is 
specified by scope of
+     * this checkpoint and current subtask.
+     *
+     * @param subtaskKey the {@link SubtaskKey} of current subtask.
+     * @param scope the scope of the checkpoint.
+     * @return the created physical file.
+     * @throws IOException if anything goes wrong with file system.
+     */
+    @Nonnull
+    protected PhysicalFile createPhysicalFile(SubtaskKey subtaskKey, 
CheckpointedStateScope scope)
+            throws IOException {
+        PhysicalFile result;
+        Exception latestException = null;
+
+        Path dirPath = getManagedDir(subtaskKey, scope);
+
+        if (dirPath == null) {
+            throw new IOException(
+                    "Could not get "
+                            + scope
+                            + " path for subtask "
+                            + subtaskKey
+                            + ", the directory may have not been created.");
+        }
+
+        for (int attempt = 0; attempt < 10; attempt++) {
+            try {
+                OutputStreamAndPath streamAndPath =
+                        EntropyInjector.createEntropyAware(
+                                fs,
+                                generatePhysicalFilePath(dirPath),
+                                FileSystem.WriteMode.NO_OVERWRITE);
+                FSDataOutputStream outputStream = streamAndPath.stream();
+                Path filePath = streamAndPath.path();
+                result = new PhysicalFile(outputStream, filePath, 
this.physicalFileDeleter, scope);
+                updateFileCreationMetrics(filePath);
+                return result;
+            } catch (Exception e) {
+                latestException = e;
+            }
+        }
+
+        throw new IOException(
+                "Could not open output stream for state file merging.", 
latestException);
+    }
+
+    private void updateFileCreationMetrics(Path path) {
+        // TODO: FLINK-32091 add io metrics
+        LOG.debug("Create a new physical file {} for checkpoint file 
merging.", path);
+    }
+
+    /**
+     * Generate a file path for a physical file.
+     *
+     * @param dirPath the parent directory path for the physical file.
+     * @return the generated file path for a physical file.
+     */
+    protected Path generatePhysicalFilePath(Path dirPath) {
+        // this must be called after initFileSystem() is called
+        // so the checkpoint directories must be not null if we reach here
+        final String fileName = UUID.randomUUID().toString();
+        return new Path(dirPath, fileName);
+    }
+
+    /**
+     * Delete a physical file by given file path. Use the io executor to do 
the deletion.
+     *
+     * @param filePath the given file path to delete.
+     */
+    protected final void deletePhysicalFile(Path filePath) {
+        ioExecutor.execute(
+                () -> {
+                    try {
+                        fs.delete(filePath, false);
+                        LOG.debug("Physical file deleted: {}.", filePath);
+                    } catch (IOException e) {
+                        LOG.warn("Fail to delete file: {}", filePath);
+                    }
+                });
+    }
+
+    // ------------------------------------------------------------------------
+    //  abstract methods
+    // ------------------------------------------------------------------------
+
+    /**
+     * Get a reused physical file or create one. This will be called in 
checkpoint output stream
+     * creation logic.
+     *
+     * <p>TODO (FLINK-32073): Implement a CheckpointStreamFactory for 
file-merging that uses this
+     * method to create or reuse physical files.
+     *
+     * <p>Basic logic of file reusing: whenever a physical file is needed, 
this method is called
+     * with necessary information provided for acquiring a file. The file will 
not be reused until
+     * it is written and returned to the reused pool by calling {@link
+     * #returnPhysicalFileForNextReuse}.
+     *
+     * @param subtaskKey the subtask key for the caller
+     * @param checkpointId the checkpoint id
+     * @param scope checkpoint scope
+     * @return the requested physical file.
+     * @throws IOException thrown if anything goes wrong with file system.
+     */
+    @Nonnull
+    protected abstract PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+            SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope 
scope)
+            throws IOException;
+
+    /**
+     * Try to return an existing physical file to the manager for next reuse. 
If this physical file
+     * is no longer needed (for reusing), it will be closed.
+     *
+     * <p>Basic logic of file reusing, see {@link 
#getOrCreatePhysicalFileForCheckpoint}.
+     *
+     * @param subtaskKey the subtask key for the caller
+     * @param checkpointId in which checkpoint this physical file is requested.
+     * @param physicalFile the returning checkpoint
+     * @throws IOException thrown if anything goes wrong with file system.
+     * @see #getOrCreatePhysicalFileForCheckpoint(SubtaskKey, long, 
CheckpointedStateScope)
+     */
+    protected abstract void returnPhysicalFileForNextReuse(
+            SubtaskKey subtaskKey, long checkpointId, PhysicalFile 
physicalFile) throws IOException;
+
+    // ------------------------------------------------------------------------
+    //  file system
+    // ------------------------------------------------------------------------
+
+    @Override
+    public Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope 
scope) {
+        if (scope.equals(CheckpointedStateScope.SHARED)) {
+            return managedSharedStateDir.get(subtaskKey);
+        } else {
+            return managedExclusiveStateDir;
+        }
+    }
+
+    static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) {
+        // Currently, we do file sync regardless of the file system.
+        // TODO: Determine whether do file sync more wisely. Add an interface 
to FileSystem if
+        // needed.
+        return true;
+    }
+
+    // ------------------------------------------------------------------------
+    //  utilities
+    // ------------------------------------------------------------------------
+
+    private void createManagedDirectory(Path managedPath) {
+        try {
+            FileStatus fileStatus = null;
+            try {
+                fileStatus = fs.getFileStatus(managedPath);
+            } catch (FileNotFoundException e) {
+                // expected exception when the path not exist, and we ignore 
it.
+            }
+            if (fileStatus == null) {
+                fs.mkdirs(managedPath);
+                LOG.info("Created a directory {} for checkpoint 
file-merging.", managedPath);
+            } else if (fileStatus.isDir()) {
+                LOG.info("Reusing previous directory {} for checkpoint 
file-merging.", managedPath);
+            } else {
+                throw new FlinkRuntimeException(
+                        "The managed path "
+                                + managedPath
+                                + " for file-merging is occupied by another 
file. Cannot create directory.");
+            }
+        } catch (IOException e) {
+            throw new FlinkRuntimeException(
+                    "Cannot create directory " + managedPath + " for 
file-merging ", e);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {}
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
new file mode 100644
index 00000000000..f07669b094b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBuilder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.Executor;
+
+/** A builder that builds the {@link FileMergingSnapshotManager}. */
+public class FileMergingSnapshotManagerBuilder {
+
+    /** The id for identify a {@link FileMergingSnapshotManager}. */
+    private final String id;
+
+    @Nullable private Executor ioExecutor = null;
+
+    /**
+     * Initialize the builder.
+     *
+     * @param id the id of the manager.
+     */
+    public FileMergingSnapshotManagerBuilder(String id) {
+        this.id = id;
+    }
+
+    /**
+     * Set the executor for io operation in manager. If null(default), all io 
operation will be
+     * executed synchronously.
+     */
+    public FileMergingSnapshotManagerBuilder setIOExecutor(@Nullable Executor 
ioExecutor) {
+        this.ioExecutor = ioExecutor;
+        return this;
+    }
+
+    /**
+     * Create file-merging snapshot manager based on configuration.
+     *
+     * <p>TODO (FLINK-32072): Create manager during the initialization of task 
manager services.
+     *
+     * <p>TODO (FLINK-32074): Support another type of 
FileMergingSnapshotManager that merges files
+     * across different checkpoints.
+     *
+     * @return the created manager.
+     */
+    public FileMergingSnapshotManager build() {
+        return new WithinCheckpointFileMergingSnapshotManager(
+                id, ioExecutor == null ? Runnable::run : ioExecutor);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java
new file mode 100644
index 00000000000..1a113d6b4b8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.StringBasedID;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.UUID;
+
+import static 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+
+/**
+ * An abstraction of logical files in file-merging checkpoints. It stands for 
a data segment, that
+ * is to say a single file before file-merging.
+ */
+public class LogicalFile {
+
+    /** ID for {@link LogicalFile}. It should be unique for each file. */
+    public static class LogicalFileId extends StringBasedID {
+
+        public LogicalFileId(String keyString) {
+            super(keyString);
+        }
+
+        public Path getFilePath() {
+            return new Path(getKeyString());
+        }
+
+        public static LogicalFileId generateRandomId() {
+            return new LogicalFileId(UUID.randomUUID().toString());
+        }
+    }
+
+    /** ID for this file. */
+    LogicalFileId fileId;
+
+    /**
+     * The id of the last checkpoint that use this logical file. This acts as 
a watermark
+     * determining whether this logical file could be removed.
+     *
+     * @see #discardWithCheckpointId(long)
+     * @see #advanceLastCheckpointId(long)
+     */
+    private long lastUsedCheckpointID = -1L;
+
+    /** Whether this logical file is removed by checkpoint 
subsumption/abortion. */
+    boolean discarded = false;
+
+    /** The physical file where this logical file is stored. This should never 
be null. */
+    @Nonnull private final PhysicalFile physicalFile;
+
+    /** The offset of the physical file that this logical file start from. */
+    private final int startOffset;
+
+    /** The length of this logical file. */
+    private final int length;
+
+    /** The id of the subtask that this logical file belongs to. */
+    @Nonnull private final SubtaskKey subtaskKey;
+
+    public LogicalFile(
+            LogicalFileId fileId,
+            @Nonnull PhysicalFile physicalFile,
+            int startOffset,
+            int length,
+            @Nonnull SubtaskKey subtaskKey) {
+        this.fileId = fileId;
+        this.physicalFile = physicalFile;
+        this.startOffset = startOffset;
+        this.length = length;
+        this.subtaskKey = subtaskKey;
+        physicalFile.incRefCount();
+    }
+
+    public LogicalFileId getFileId() {
+        return fileId;
+    }
+
+    /**
+     * A logical file may share across checkpoints (especially for shared 
state). When this logical
+     * file is used/reused by a checkpoint, update the last checkpoint id that 
uses this logical
+     * file.
+     *
+     * @param checkpointId the checkpoint that uses this logical file.
+     */
+    public void advanceLastCheckpointId(long checkpointId) {
+        if (checkpointId > lastUsedCheckpointID) {
+            this.lastUsedCheckpointID = checkpointId;
+        }
+    }
+
+    /**
+     * When a checkpoint that uses this logical file is subsumed or aborted, 
discard this logical
+     * file. If this file is used by a later checkpoint, the file should not 
be discarded. Note that
+     * the removal of logical may cause the deletion of physical file.
+     *
+     * @param checkpointId the checkpoint that is notified subsumed or aborted.
+     * @throws IOException if anything goes wrong with file system.
+     */
+    public void discardWithCheckpointId(long checkpointId) throws IOException {
+        if (!discarded && checkpointId >= lastUsedCheckpointID) {
+            physicalFile.decRefCount();
+            discarded = true;
+        }
+    }
+
+    public long getLastUsedCheckpointID() {
+        return lastUsedCheckpointID;
+    }
+
+    @Nonnull
+    public PhysicalFile getPhysicalFile() {
+        return physicalFile;
+    }
+
+    public int getStartOffset() {
+        return startOffset;
+    }
+
+    public int getLength() {
+        return length;
+    }
+
+    @Nonnull
+    public SubtaskKey getSubtaskKey() {
+        return subtaskKey;
+    }
+
+    @VisibleForTesting
+    public boolean isDiscarded() {
+        return discarded;
+    }
+
+    @Override
+    public int hashCode() {
+        return fileId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        LogicalFile that = (LogicalFile) o;
+        return fileId.equals(that.fileId);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java
new file mode 100644
index 00000000000..2709aaabbd7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** An abstraction of physical files in file-merging checkpoints. */
+public class PhysicalFile {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(PhysicalFile.class);
+
+    /** Functional interface to delete the physical file. */
+    @FunctionalInterface
+    public interface PhysicalFileDeleter {
+        /** Delete the file. */
+        void perform(Path filePath) throws IOException;
+    }
+
+    /**
+     * Output stream to the file, which keeps open for writing. It can be null 
if the file is
+     * closed.
+     */
+    @Nullable private FSDataOutputStream outputStream;
+
+    /** Reference count from the logical files. */
+    private final AtomicInteger logicalFileRefCount;
+
+    /** The size of this physical file. */
+    private final AtomicLong size;
+
+    /**
+     * Deleter that will be called when delete this physical file. If null, do 
not delete this
+     * physical file.
+     */
+    @Nullable private final PhysicalFileDeleter deleter;
+
+    private final Path filePath;
+
+    private final CheckpointedStateScope scope;
+
+    /**
+     * If a physical file is closed, it means no more file segments will be 
written to the physical
+     * file, and it can be deleted once its logicalFileRefCount decreases to 0.
+     */
+    private boolean closed;
+
+    /**
+     * A file can be deleted if: 1. It is closed, and 2. No more {@link 
LogicalFile}s have reference
+     * on it.
+     */
+    private boolean deleted = false;
+
+    public PhysicalFile(
+            @Nullable FSDataOutputStream outputStream,
+            Path filePath,
+            @Nullable PhysicalFileDeleter deleter,
+            CheckpointedStateScope scope) {
+        this.filePath = filePath;
+        this.outputStream = outputStream;
+        this.closed = outputStream == null;
+        this.deleter = deleter;
+        this.scope = scope;
+        this.size = new AtomicLong(0);
+        this.logicalFileRefCount = new AtomicInteger(0);
+    }
+
+    @Nullable
+    public FSDataOutputStream getOutputStream() {
+        return outputStream;
+    }
+
+    void incRefCount() {
+        int newValue = this.logicalFileRefCount.incrementAndGet();
+        LOG.trace(
+                "Increase the reference count of physical file: {} by 1. New 
value is: {}.",
+                this.filePath,
+                newValue);
+    }
+
+    void decRefCount() throws IOException {
+        Preconditions.checkArgument(logicalFileRefCount.get() > 0);
+        int newValue = this.logicalFileRefCount.decrementAndGet();
+        LOG.trace(
+                "Decrease the reference count of physical file: {} by 1. New 
value is: {}. ",
+                this.filePath,
+                newValue);
+        deleteIfNecessary();
+    }
+
+    /**
+     * Delete this physical file if there is no reference count from logical 
files (all discarded),
+     * and this physical file is closed (no further writing on it).
+     *
+     * @throws IOException if anything goes wrong with file system.
+     */
+    public void deleteIfNecessary() throws IOException {
+        synchronized (this) {
+            if (!isOpen() && !deleted && this.logicalFileRefCount.get() <= 0) {
+                if (outputStream != null) {
+                    try {
+                        outputStream.close();
+                    } catch (IOException e) {
+                        LOG.warn("Fail to close output stream when deleting 
file: {}", filePath);
+                    }
+                }
+                if (deleter != null) {
+                    deleter.perform(filePath);
+                }
+                this.deleted = true;
+            }
+        }
+    }
+
+    void incSize(long delta) {
+        this.size.addAndGet(delta);
+    }
+
+    long getSize() {
+        return size.get();
+    }
+
+    @VisibleForTesting
+    int getRefCount() {
+        return logicalFileRefCount.get();
+    }
+
+    public void close() throws IOException {
+        innerClose();
+        deleteIfNecessary();
+    }
+
+    /**
+     * Close the physical file, stop reusing.
+     *
+     * @throws IOException if anything goes wrong with file system.
+     */
+    private void innerClose() throws IOException {
+        closed = true;
+        if (outputStream != null) {
+            outputStream.close();
+            outputStream = null;
+        }
+    }
+
+    /** @return whether this physical file is still open for writing. */
+    public boolean isOpen() {
+        return !closed && outputStream != null;
+    }
+
+    public boolean isDeleted() {
+        return deleted;
+    }
+
+    public Path getFilePath() {
+        return filePath;
+    }
+
+    public CheckpointedStateScope getScope() {
+        return scope;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        PhysicalFile that = (PhysicalFile) o;
+        return filePath.equals(that.filePath);
+    }
+
+    @Override
+    public String toString() {
+        return String.format(
+                "Physical File: [%s], closed: %s, logicalFileRefCount: %d",
+                filePath, closed, logicalFileRefCount.get());
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
new file mode 100644
index 00000000000..e8e79f0c052
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+/** A {@link FileMergingSnapshotManager} that merging files within a 
checkpoint. */
+public class WithinCheckpointFileMergingSnapshotManager extends 
FileMergingSnapshotManagerBase {
+
+    /** A dummy subtask key to reuse files among subtasks for private states. 
*/
+    private static final SubtaskKey DUMMY_SUBTASK_KEY = new 
SubtaskKey("dummy", -1, -1);
+
+    /**
+     * For WITHIN_BOUNDARY mode, physical files are NOT shared among multiple 
checkpoints. This map
+     * contains all physical files that are still writable and not occupied by 
a writer. The key of
+     * this map consist of checkpoint id, subtask key, and checkpoint scope, 
which collectively
+     * determine the physical file to be reused.
+     */
+    private final Map<Tuple3<Long, SubtaskKey, CheckpointedStateScope>, 
PhysicalFile>
+            writablePhysicalFilePool;
+
+    public WithinCheckpointFileMergingSnapshotManager(String id, Executor 
ioExecutor) {
+        // currently there is no file size limit For WITHIN_BOUNDARY mode
+        super(id, ioExecutor);
+        writablePhysicalFilePool = new HashMap<>();
+    }
+
+    @Override
+    @Nonnull
+    protected PhysicalFile getOrCreatePhysicalFileForCheckpoint(
+            SubtaskKey subtaskKey, long checkpointId, CheckpointedStateScope 
scope)
+            throws IOException {
+        // TODO: FLINK-32076 will add a file pool for each subtask key.
+        Tuple3<Long, SubtaskKey, CheckpointedStateScope> fileKey =
+                Tuple3.of(
+                        checkpointId,
+                        scope == CheckpointedStateScope.SHARED ? subtaskKey : 
DUMMY_SUBTASK_KEY,
+                        scope);
+        PhysicalFile file;
+        synchronized (writablePhysicalFilePool) {
+            file = writablePhysicalFilePool.remove(fileKey);
+            if (file == null) {
+                file = createPhysicalFile(subtaskKey, scope);
+            }
+        }
+        return file;
+    }
+
+    @Override
+    protected void returnPhysicalFileForNextReuse(
+            SubtaskKey subtaskKey, long checkpointId, PhysicalFile 
physicalFile)
+            throws IOException {
+        // TODO: FLINK-32076 will add a file pool for reusing.
+        CheckpointedStateScope scope = physicalFile.getScope();
+        Tuple3<Long, SubtaskKey, CheckpointedStateScope> fileKey =
+                Tuple3.of(
+                        checkpointId,
+                        scope == CheckpointedStateScope.SHARED ? subtaskKey : 
DUMMY_SUBTASK_KEY,
+                        scope);
+        PhysicalFile current;
+        synchronized (writablePhysicalFilePool) {
+            current = writablePhysicalFilePool.putIfAbsent(fileKey, 
physicalFile);
+        }
+        // TODO: We sync the file when return to the reuse pool for safety. 
Actually it could be
+        // optimized after FLINK-32075.
+        if (shouldSyncAfterClosingLogicalFile) {
+            FSDataOutputStream os = physicalFile.getOutputStream();
+            if (os != null) {
+                os.sync();
+            }
+        }
+        if (current != physicalFile) {
+            physicalFile.close();
+        }
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
new file mode 100644
index 00000000000..c8299575ae8
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTest.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.filemerging;
+
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link FileMergingSnapshotManager}. */
+public class FileMergingSnapshotManagerTest {
+
+    private final String tmId = "Testing";
+
+    private final OperatorID operatorID = new OperatorID(289347923L, 
75893479L);
+
+    private SubtaskKey subtaskKey1;
+    private SubtaskKey subtaskKey2;
+
+    private Path checkpointBaseDir;
+
+    @BeforeEach
+    public void setup(@TempDir java.nio.file.Path tempFolder) {
+        // use simplified job ids for the tests
+        long jobId = 1;
+        subtaskKey1 = new SubtaskKey(operatorID, new TaskInfo("TestingTask", 
128, 0, 128, 3));
+        subtaskKey2 = new SubtaskKey(operatorID, new TaskInfo("TestingTask", 
128, 1, 128, 3));
+        checkpointBaseDir = new Path(tempFolder.toString(), 
String.valueOf(jobId));
+    }
+
+    @Test
+    public void testCreateFileMergingSnapshotManager() throws IOException {
+        try (FileMergingSnapshotManagerBase fmsm =
+                (FileMergingSnapshotManagerBase)
+                        createFileMergingSnapshotManager(checkpointBaseDir)) {
+            fmsm.registerSubtaskForSharedStates(subtaskKey1);
+            assertThat(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE))
+                    .isEqualTo(
+                            new Path(
+                                    checkpointBaseDir,
+                                    AbstractFsCheckpointStorageAccess
+                                                    
.CHECKPOINT_TASK_OWNED_STATE_DIR
+                                            + "/"
+                                            + tmId));
+            assertThat(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED))
+                    .isEqualTo(
+                            new Path(
+                                    checkpointBaseDir,
+                                    
AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR
+                                            + "/"
+                                            + 
subtaskKey1.getManagedDirName()));
+        }
+    }
+
+    @Test
+    public void testCreateAndReuseFiles() throws IOException {
+        try (FileMergingSnapshotManagerBase fmsm =
+                (FileMergingSnapshotManagerBase)
+                        createFileMergingSnapshotManager(checkpointBaseDir)) {
+            fmsm.registerSubtaskForSharedStates(subtaskKey1);
+            fmsm.registerSubtaskForSharedStates(subtaskKey2);
+            // firstly, we try shared state.
+            PhysicalFile file1 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file1.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            // allocate another
+            PhysicalFile file2 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file2.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file2).isNotEqualTo(file1);
+
+            // return for reuse
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
+
+            // allocate for another subtask
+            PhysicalFile file3 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey2, 0, CheckpointedStateScope.SHARED);
+            assertThat(file3.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.SHARED));
+            assertThat(file3).isNotEqualTo(file1);
+
+            // allocate for another checkpoint
+            PhysicalFile file4 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 1, CheckpointedStateScope.SHARED);
+            assertThat(file4.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file4).isNotEqualTo(file1);
+
+            // allocate for this checkpoint
+            PhysicalFile file5 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(file5.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.SHARED));
+            assertThat(file5).isEqualTo(file1);
+
+            // Secondly, we try private state
+            PhysicalFile file6 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file6.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+
+            // allocate another
+            PhysicalFile file7 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file7.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+            assertThat(file7).isNotEqualTo(file6);
+
+            // return for reuse
+            fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file6);
+
+            // allocate for another checkpoint
+            PhysicalFile file8 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file8.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+            assertThat(file8).isNotEqualTo(file6);
+
+            // allocate for this checkpoint but another subtask
+            PhysicalFile file9 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey2, 0, CheckpointedStateScope.EXCLUSIVE);
+            assertThat(file9.getFilePath().getParent())
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.EXCLUSIVE));
+            assertThat(file9).isEqualTo(file6);
+
+            assertThat(fmsm.getManagedDir(subtaskKey2, 
CheckpointedStateScope.EXCLUSIVE))
+                    .isEqualTo(fmsm.getManagedDir(subtaskKey1, 
CheckpointedStateScope.EXCLUSIVE));
+        }
+    }
+
+    @Test
+    public void testRefCountBetweenLogicalAndPhysicalFiles() throws 
IOException {
+        try (FileMergingSnapshotManagerBase fmsm =
+                (FileMergingSnapshotManagerBase)
+                        createFileMergingSnapshotManager(checkpointBaseDir)) {
+            fmsm.registerSubtaskForSharedStates(subtaskKey1);
+            fmsm.registerSubtaskForSharedStates(subtaskKey2);
+
+            PhysicalFile physicalFile1 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            assertThat(physicalFile1.isOpen()).isTrue();
+
+            LogicalFile logicalFile1 = fmsm.createLogicalFile(physicalFile1, 
0, 10, subtaskKey1);
+            assertThat(logicalFile1.getSubtaskKey()).isEqualTo(subtaskKey1);
+            
assertThat(logicalFile1.getPhysicalFile()).isEqualTo(physicalFile1);
+            assertThat(logicalFile1.getStartOffset()).isEqualTo(0);
+            assertThat(logicalFile1.getLength()).isEqualTo(10);
+            assertThat(physicalFile1.getRefCount()).isEqualTo(1);
+
+            assertThat(logicalFile1.isDiscarded()).isFalse();
+            logicalFile1.advanceLastCheckpointId(2);
+            assertThat(logicalFile1.getLastUsedCheckpointID()).isEqualTo(2);
+            logicalFile1.advanceLastCheckpointId(1);
+            assertThat(logicalFile1.getLastUsedCheckpointID()).isEqualTo(2);
+            logicalFile1.discardWithCheckpointId(1);
+            assertThat(logicalFile1.isDiscarded()).isFalse();
+            logicalFile1.discardWithCheckpointId(2);
+            assertThat(logicalFile1.isDiscarded()).isTrue();
+
+            // the stream is still open for reuse
+            assertThat(physicalFile1.isOpen()).isTrue();
+            assertThat(physicalFile1.isDeleted()).isFalse();
+            assertThat(physicalFile1.getRefCount()).isEqualTo(0);
+
+            physicalFile1.close();
+            assertThat(physicalFile1.isOpen()).isFalse();
+            assertThat(physicalFile1.isDeleted()).isTrue();
+
+            // try close physical file but not deleted
+            PhysicalFile physicalFile2 =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+            LogicalFile logicalFile2 = fmsm.createLogicalFile(physicalFile2, 
0, 10, subtaskKey1);
+            
assertThat(logicalFile2.getPhysicalFile()).isEqualTo(physicalFile2);
+            assertThat(logicalFile2.getStartOffset()).isEqualTo(0);
+            assertThat(logicalFile2.getLength()).isEqualTo(10);
+            assertThat(physicalFile2.getRefCount()).isEqualTo(1);
+            logicalFile2.advanceLastCheckpointId(2);
+
+            assertThat(physicalFile2.isOpen()).isTrue();
+            assertThat(physicalFile2.isDeleted()).isFalse();
+            physicalFile2.close();
+            assertThat(physicalFile2.isOpen()).isFalse();
+            assertThat(physicalFile2.isDeleted()).isFalse();
+            assertThat(physicalFile2.getRefCount()).isEqualTo(1);
+
+            logicalFile2.discardWithCheckpointId(2);
+            assertThat(logicalFile2.isDiscarded()).isTrue();
+            assertThat(physicalFile2.isDeleted()).isTrue();
+            assertThat(physicalFile2.getRefCount()).isEqualTo(0);
+        }
+    }
+
+    @Test
+    public void testSizeStatsInPhysicalFile() throws IOException {
+        try (FileMergingSnapshotManagerBase fmsm =
+                (FileMergingSnapshotManagerBase)
+                        createFileMergingSnapshotManager(checkpointBaseDir)) {
+            fmsm.registerSubtaskForSharedStates(subtaskKey1);
+            fmsm.registerSubtaskForSharedStates(subtaskKey2);
+            PhysicalFile physicalFile =
+                    fmsm.getOrCreatePhysicalFileForCheckpoint(
+                            subtaskKey1, 0, CheckpointedStateScope.SHARED);
+
+            assertThat(physicalFile.getSize()).isEqualTo(0);
+            physicalFile.incSize(123);
+            assertThat(physicalFile.getSize()).isEqualTo(123);
+            physicalFile.incSize(456);
+            assertThat(physicalFile.getSize()).isEqualTo(123 + 456);
+        }
+    }
+
+    private FileMergingSnapshotManager createFileMergingSnapshotManager(Path 
checkpointBaseDir)
+            throws IOException {
+        FileSystem fs = LocalFileSystem.getSharedInstance();
+        Path sharedStateDir =
+                new Path(
+                        checkpointBaseDir,
+                        
AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR);
+        Path taskOwnedStateDir =
+                new Path(
+                        checkpointBaseDir,
+                        
AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR);
+        if (!fs.exists(checkpointBaseDir)) {
+            fs.mkdirs(checkpointBaseDir);
+            fs.mkdirs(sharedStateDir);
+            fs.mkdirs(taskOwnedStateDir);
+        }
+        FileMergingSnapshotManager fmsm = new 
FileMergingSnapshotManagerBuilder(tmId).build();
+        fmsm.initFileSystem(
+                LocalFileSystem.getSharedInstance(),
+                checkpointBaseDir,
+                sharedStateDir,
+                taskOwnedStateDir);
+        assertThat(fmsm).isNotNull();
+        return fmsm;
+    }
+}

Reply via email to