This is an automated email from the ASF dual-hosted git repository.

leiyanfei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 05b27be4112 [FLINK-34668][checkpoint] Report operator state handle of 
file merging directory to JM
05b27be4112 is described below

commit 05b27be4112e1f5ca4c5614b1714a202c13343b6
Author: fredia <fredia...@gmail.com>
AuthorDate: Mon Mar 18 14:05:52 2024 +0800

    [FLINK-34668][checkpoint] Report operator state handle of file merging 
directory to JM
---
 .../filemerging/FileMergingSnapshotManager.java    |  12 ++
 .../FileMergingSnapshotManagerBase.java            |  32 +++++
 ...efaultOperatorStateBackendSnapshotStrategy.java |  27 +++-
 .../filemerging/DirectoryStreamStateHandle.java    |  90 ++++++++++++
 .../EmptyFileMergingOperatorStreamStateHandle.java |  64 +++++++++
 .../filemerging/EmptySegmentFileStateHandle.java   |  46 ++++++
 .../FileMergingOperatorStreamStateHandle.java      | 154 +++++++++++++++++++++
 .../filemerging/SegmentFileStateHandle.java        |   5 +-
 .../FileMergingCheckpointStateOutputStream.java    |   2 +-
 .../FsMergingCheckpointStorageLocation.java        |  11 ++
 ...ssCheckpointFileMergingSnapshotManagerTest.java |   1 +
 .../FileMergingSnapshotManagerTestBase.java        |   1 +
 ...inCheckpointFileMergingSnapshotManagerTest.java |   1 +
 .../runtime/state/OperatorStateBackendTest.java    |  85 ++++++++++++
 .../runtime/state/SharedStateRegistryTest.java     |  86 ++++++++++++
 ...FileMergingCheckpointStateOutputStreamTest.java |   2 +-
 .../FsMergingCheckpointStorageLocationTest.java    |   2 +-
 17 files changed, 614 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index cc54854a7a3..afc5eb618dc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
 import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;
 
@@ -117,6 +118,17 @@ public interface FileMergingSnapshotManager extends 
Closeable {
      */
     Path getManagedDir(SubtaskKey subtaskKey, CheckpointedStateScope scope);
 
+    /**
+     * Get the {@link DirectoryStreamStateHandle} of the managed directory, 
created in {@link
+     * #initFileSystem} or {@link #registerSubtaskForSharedStates}.
+     *
+     * @param subtaskKey the subtask key identifying the subtask.
+     * @param scope the checkpoint scope.
+     * @return the {@link DirectoryStreamStateHandle} for one subtask in 
specified checkpoint scope.
+     */
+    DirectoryStreamStateHandle getManagedDirStateHandle(
+            SubtaskKey subtaskKey, CheckpointedStateScope scope);
+
     /**
      * Notifies the manager that the checkpoint with the given {@code 
checkpointId} completed and
      * was committed.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
index 3dc9c788561..d46edcd235e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
@@ -27,6 +27,8 @@ import org.apache.flink.core.fs.OutputStreamAndPath;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 import javax.annotation.concurrent.GuardedBy;
 
+import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.HashSet;
@@ -105,12 +108,24 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
      */
     private final Map<SubtaskKey, Path> managedSharedStateDir = new 
ConcurrentHashMap<>();
 
+    /**
+     * The {@link DirectoryStreamStateHandle} for shared state directories, 
one for each subtask.
+     */
+    private final Map<SubtaskKey, DirectoryStreamStateHandle> 
managedSharedStateDirHandles =
+            new ConcurrentHashMap<>();
+
     /**
      * The private state files are merged across subtasks, there is only one 
directory for
      * merged-files within one TM per job.
      */
     protected Path managedExclusiveStateDir;
 
+    /**
+     * The {@link DirectoryStreamStateHandle} for private state directory, one 
for each task
+     * manager.
+     */
+    protected DirectoryStreamStateHandle managedExclusiveStateDirHandle;
+
     public FileMergingSnapshotManagerBase(
             String id, long maxFileSize, PhysicalFilePool.Type filePoolType, 
Executor ioExecutor) {
         this.id = id;
@@ -152,6 +167,9 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
         Path managedExclusivePath = new Path(taskOwnedStateDir, id);
         createManagedDirectory(managedExclusivePath);
         this.managedExclusiveStateDir = managedExclusivePath;
+        this.managedExclusiveStateDirHandle =
+                DirectoryStreamStateHandle.forPathWithZeroSize(
+                        new File(managedExclusivePath.getPath()).toPath());
         this.writeBufferSize = writeBufferSize;
     }
 
@@ -162,6 +180,10 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
         if (!managedSharedStateDir.containsKey(subtaskKey)) {
             createManagedDirectory(managedPath);
             managedSharedStateDir.put(subtaskKey, managedPath);
+            managedSharedStateDirHandles.put(
+                    subtaskKey,
+                    DirectoryStreamStateHandle.forPathWithZeroSize(
+                            new File(managedPath.getPath()).toPath()));
         }
     }
 
@@ -477,6 +499,16 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
         }
     }
 
+    @Override
+    public DirectoryStreamStateHandle getManagedDirStateHandle(
+            SubtaskKey subtaskKey, CheckpointedStateScope scope) {
+        if (scope.equals(CheckpointedStateScope.SHARED)) {
+            return managedSharedStateDirHandles.get(subtaskKey);
+        } else {
+            return managedExclusiveStateDirHandle;
+        }
+    }
+
     static boolean shouldSyncAfterClosingLogicalFile(FileSystem fileSystem) {
         // Currently, we do file sync regardless of the file system.
         // TODO: Determine whether do file sync more wisely. Add an interface 
to FileSystem if
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
index d81f7b3a1c0..793c8f3e9b3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackendSnapshotStrategy.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import 
org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
+import 
org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
+import 
org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation;
 import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
 import org.apache.flink.util.CollectionUtil;
 
@@ -121,7 +124,17 @@ class DefaultOperatorStateBackendSnapshotStrategy
 
         if (registeredBroadcastStatesDeepCopies.isEmpty()
                 && registeredOperatorStatesDeepCopies.isEmpty()) {
-            return snapshotCloseableRegistry -> SnapshotResult.empty();
+            if (streamFactory instanceof FsMergingCheckpointStorageLocation) {
+                FsMergingCheckpointStorageLocation location =
+                        (FsMergingCheckpointStorageLocation) streamFactory;
+                return snapshotCloseableRegistry ->
+                        SnapshotResult.of(
+                                
EmptyFileMergingOperatorStreamStateHandle.create(
+                                        location.getExclusiveStateHandle(),
+                                        location.getSharedStateHandle()));
+            } else {
+                return snapshotCloseableRegistry -> SnapshotResult.empty();
+            }
         }
 
         return (snapshotCloseableRegistry) -> {
@@ -204,7 +217,17 @@ class DefaultOperatorStateBackendSnapshotStrategy
             if (snapshotCloseableRegistry.unregisterCloseable(localOut)) {
                 StreamStateHandle stateHandle = localOut.closeAndGetHandle();
                 if (stateHandle != null) {
-                    retValue = new 
OperatorStreamStateHandle(writtenStatesMetaData, stateHandle);
+                    retValue =
+                            streamFactory instanceof 
FsMergingCheckpointStorageLocation
+                                    ? new FileMergingOperatorStreamStateHandle(
+                                            
((FsMergingCheckpointStorageLocation) streamFactory)
+                                                    .getExclusiveStateHandle(),
+                                            
((FsMergingCheckpointStorageLocation) streamFactory)
+                                                    .getSharedStateHandle(),
+                                            writtenStatesMetaData,
+                                            stateHandle)
+                                    : new OperatorStreamStateHandle(
+                                            writtenStatesMetaData, 
stateHandle);
                 }
                 return SnapshotResult.of(retValue);
             } else {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java
new file mode 100644
index 00000000000..dacebae4c10
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.DirectoryStateHandle;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Path;
+import java.util.Optional;
+
+/** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */
+public class DirectoryStreamStateHandle extends DirectoryStateHandle 
implements StreamStateHandle {
+
+    private static final long serialVersionUID = 1L;
+
+    public DirectoryStreamStateHandle(@Nonnull Path directory, long 
directorySize) {
+        super(directory, directorySize);
+    }
+
+    @Override
+    public FSDataInputStream openInputStream() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Optional<byte[]> asBytesIfInMemory() {
+        return Optional.empty();
+    }
+
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return new PhysicalStateHandleID(getDirectory().toString());
+    }
+
+    public SharedStateRegistryKey createStateRegistryKey() {
+        return new SharedStateRegistryKey(getDirectory().toUri().toString());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        DirectoryStreamStateHandle that = (DirectoryStreamStateHandle) o;
+
+        return getDirectory().equals(that.getDirectory());
+    }
+
+    @Override
+    public String toString() {
+        return "DirectoryStreamStateHandle{" + "directory=" + getDirectory() + 
'}';
+    }
+
+    /**
+     * Return a {@link DirectoryStreamStateHandle} with zero size, which 
usually used to be
+     * registered to {@link 
org.apache.flink.runtime.state.SharedStateRegistry} to track the life
+     * cycle of the directory, therefore a fake size is provided.
+     *
+     * @param directory the directory.
+     * @return DirectoryStreamStateHandle with zero size.
+     */
+    public static DirectoryStreamStateHandle forPathWithZeroSize(@Nonnull Path 
directory) {
+        return new DirectoryStreamStateHandle(directory, 0);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java
new file mode 100644
index 00000000000..6cbc9555e6d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptyFileMergingOperatorStreamStateHandle.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * An empty {@link FileMergingOperatorStreamStateHandle} that is only used as 
a placeholder to
+ * prevent file merging directory from being deleted.
+ */
+public class EmptyFileMergingOperatorStreamStateHandle
+        extends FileMergingOperatorStreamStateHandle {
+
+    private static final long serialVersionUID = 1L;
+
+    public EmptyFileMergingOperatorStreamStateHandle(
+            DirectoryStreamStateHandle taskOwnedDirHandle,
+            DirectoryStreamStateHandle sharedDirHandle,
+            Map<String, StateMetaInfo> stateNameToPartitionOffsets,
+            StreamStateHandle delegateStateHandle) {
+        super(
+                taskOwnedDirHandle,
+                sharedDirHandle,
+                stateNameToPartitionOffsets,
+                delegateStateHandle);
+    }
+
+    /**
+     * Create an empty {@link EmptyFileMergingOperatorStreamStateHandle}.
+     *
+     * @param taskownedDirHandle the directory where operator state is stored.
+     * @param sharedDirHandle the directory where shared state is stored.
+     */
+    public static EmptyFileMergingOperatorStreamStateHandle create(
+            DirectoryStreamStateHandle taskownedDirHandle,
+            DirectoryStreamStateHandle sharedDirHandle) {
+        final Map<String, OperatorStateHandle.StateMetaInfo> 
writtenStatesMetaData =
+                Collections.emptyMap();
+        return new EmptyFileMergingOperatorStreamStateHandle(
+                taskownedDirHandle,
+                sharedDirHandle,
+                writtenStatesMetaData,
+                EmptySegmentFileStateHandle.INSTANCE);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java
new file mode 100644
index 00000000000..44b487fd53b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointedStateScope;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+
+/** An empty {@link SegmentFileStateHandle} that is only used as a 
placeholder. */
+public class EmptySegmentFileStateHandle extends SegmentFileStateHandle {
+    private static final long serialVersionUID = 1L;
+
+    public static final EmptySegmentFileStateHandle INSTANCE =
+            new EmptySegmentFileStateHandle(
+                    new Path("empty"), 0, 0, CheckpointedStateScope.EXCLUSIVE);
+
+    private EmptySegmentFileStateHandle(
+            Path filePath, long startPos, long stateSize, 
CheckpointedStateScope scope) {
+        super(filePath, startPos, stateSize, scope);
+    }
+
+    @Override
+    public FSDataInputStream openInputStream() throws IOException {
+        throw new UnsupportedEncodingException(
+                "Cannot open input stream from an 
EmptySegmentFileStateHandle.");
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java
new file mode 100644
index 00000000000..ada378f2d03
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/FileMergingOperatorStreamStateHandle.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filemerging;
+
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryKey;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A {@link OperatorStreamStateHandle} that works for file merging checkpoints.
+ *
+ * <p>Operator states are stored in `taskownd/` dir when file merging is 
enabled. When an operator
+ * state dir is not referenced by any checkpoint, {@link SharedStateRegistry} 
will discard it. The
+ * shared subtask dir of fire merging is also tracked by {@link
+ * FileMergingOperatorStreamStateHandle}.
+ *
+ * <p>The shared subtask dir of file merging is created when task 
initialization, which will be
+ * discarded when no checkpoint refer to it.
+ */
+public class FileMergingOperatorStreamStateHandle extends 
OperatorStreamStateHandle
+        implements CompositeStateHandle {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(FileMergingOperatorStreamStateHandle.class);
+
+    /** The directory handle of file merging under 'taskowed/', one for each 
job. */
+    private final DirectoryStreamStateHandle taskOwnedDirHandle;
+
+    /**
+     * The directory handle of file merging under 'shared/', one for each 
subtask.
+     *
+     * @see 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager the 
layout of
+     *     file merging checkpoint directory.
+     */
+    private final DirectoryStreamStateHandle sharedDirHandle;
+
+    private transient SharedStateRegistry sharedStateRegistry;
+
+    public FileMergingOperatorStreamStateHandle(
+            DirectoryStreamStateHandle taskOwnedDirHandle,
+            DirectoryStreamStateHandle sharedDirHandle,
+            Map<String, StateMetaInfo> stateNameToPartitionOffsets,
+            StreamStateHandle delegateStateHandle) {
+        super(stateNameToPartitionOffsets, delegateStateHandle);
+        this.taskOwnedDirHandle = taskOwnedDirHandle;
+        this.sharedDirHandle = sharedDirHandle;
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointId) {
+        Preconditions.checkState(
+                sharedStateRegistry != stateRegistry,
+                "The state handle has already registered its shared states to 
the given registry.");
+
+        sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
+
+        LOG.trace(
+                "Registering FileMergingOperatorStreamStateHandle for 
checkpoint {} from backend.",
+                checkpointId);
+        stateRegistry.registerReference(
+                new SharedStateRegistryKey(
+                        
getDelegateStateHandle().getStreamStateHandleID().getKeyString()),
+                getDelegateStateHandle(),
+                checkpointId);
+
+        stateRegistry.registerReference(
+                taskOwnedDirHandle.createStateRegistryKey(), 
taskOwnedDirHandle, checkpointId);
+        stateRegistry.registerReference(
+                sharedDirHandle.createStateRegistryKey(), sharedDirHandle, 
checkpointId);
+    }
+
+    @Override
+    public void discardState() throws Exception {
+        SharedStateRegistry registry = this.sharedStateRegistry;
+        final boolean isRegistered = (registry != null);
+
+        LOG.trace(
+                "Discarding FileMergingOperatorStreamStateHandle (registered = 
{}) from backend.",
+                isRegistered);
+
+        try {
+            getDelegateStateHandle().discardState();
+        } catch (Exception e) {
+            LOG.warn("Could not properly discard directory state handle.", e);
+        }
+    }
+
+    @Override
+    public long getCheckpointedSize() {
+        return getDelegateStateHandle().getStateSize();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        FileMergingOperatorStreamStateHandle that = 
(FileMergingOperatorStreamStateHandle) o;
+
+        return super.equals(that)
+                && taskOwnedDirHandle.equals(that.taskOwnedDirHandle)
+                && sharedDirHandle.equals(that.sharedDirHandle);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + Objects.hashCode(taskOwnedDirHandle);
+        result = 31 * result + Objects.hashCode(sharedDirHandle);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "FileMergingOperatorStreamStateHandle{"
+                + super.toString()
+                + ", taskOwnedDirHandle="
+                + taskOwnedDirHandle
+                + ", sharedDirHandle="
+                + sharedDirHandle
+                + '}';
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SegmentFileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
similarity index 96%
rename from 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SegmentFileStateHandle.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
index 0dce9304f78..2a3aa2f49fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/SegmentFileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java
@@ -15,11 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.checkpoint.filemerging;
+package org.apache.flink.runtime.state.filemerging;
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.PhysicalStateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
@@ -138,7 +139,7 @@ public class SegmentFileStateHandle implements 
StreamStateHandle {
 
         SegmentFileStateHandle that = (SegmentFileStateHandle) o;
 
-        return super.equals(that)
+        return filePath.equals(that.filePath)
                 && startPos == that.startPos
                 && stateSize == that.stateSize
                 && scope.equals(that.scope);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java
index b367d1724a9..688fad8c2ed 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStream.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.Path;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
-import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java
index d485a3dfdfe..f367094bb8b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
 
 import java.io.IOException;
 import java.util.List;
@@ -84,6 +85,16 @@ public class FsMergingCheckpointStorageLocation extends 
FsCheckpointStorageLocat
         return backwardsConvertor.get();
     }
 
+    public DirectoryStreamStateHandle getExclusiveStateHandle() {
+        return fileMergingSnapshotManager.getManagedDirStateHandle(
+                subtaskKey, CheckpointedStateScope.EXCLUSIVE);
+    }
+
+    public DirectoryStreamStateHandle getSharedStateHandle() {
+        return fileMergingSnapshotManager.getManagedDirStateHandle(
+                subtaskKey, CheckpointedStateScope.SHARED);
+    }
+
     @Override
     public boolean canFastDuplicate(StreamStateHandle stateHandle, 
CheckpointedStateScope scope)
             throws IOException {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
index 475c13b5445..cc6c53793df 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint.filemerging;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
 
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
index d6553060e2e..3147e668ef1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
 import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
index 4128e5e7cd3..340bb917d41 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint.filemerging;
 
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 import 
org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
 
 import org.junit.jupiter.api.Test;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 7f70163ea29..2122303545b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -30,12 +30,21 @@ import 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
+import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
 import org.apache.flink.runtime.execution.Environment;
+import 
org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
+import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
+import 
org.apache.flink.runtime.state.filesystem.FsMergingCheckpointStorageLocation;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
@@ -47,6 +56,7 @@ import org.apache.flink.util.concurrent.FutureUtils;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
@@ -64,6 +74,8 @@ import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.apache.flink.core.fs.Path.fromLocalFile;
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
@@ -418,6 +430,62 @@ class OperatorStateBackendTest {
         assertThat(stateHandle).isNull();
     }
 
+    @Test
+    void testFileMergingSnapshotEmpty(@TempDir File tmpFolder) throws 
Exception {
+        final AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
+        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
+
+        Environment env = createMockEnvironment();
+        final OperatorStateBackend operatorStateBackend =
+                abstractStateBackend.createOperatorStateBackend(
+                        new OperatorStateBackendParametersImpl(
+                                env, "testOperator", emptyStateHandles, 
cancelStreamRegistry));
+
+        Path checkpointBaseDir = new Path(tmpFolder.toString());
+        Path sharedStateDir =
+                new Path(
+                        checkpointBaseDir,
+                        
AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR);
+        Path taskOwnedStateDir =
+                new Path(
+                        checkpointBaseDir,
+                        
AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR);
+
+        final FileMergingSnapshotManager.SubtaskKey subtaskKey =
+                new FileMergingSnapshotManager.SubtaskKey("opId", 1, 1);
+        LocalFileSystem fs = getSharedInstance();
+        CheckpointStorageLocationReference cslReference =
+                AbstractFsCheckpointStorageAccess.encodePathAsReference(
+                        fromLocalFile(fs.pathToFile(checkpointBaseDir)));
+        FileMergingSnapshotManager snapshotManager =
+                createFileMergingSnapshotManager(
+                        checkpointBaseDir, sharedStateDir, taskOwnedStateDir, 
subtaskKey);
+        CheckpointStreamFactory streamFactory =
+                new FsMergingCheckpointStorageLocation(
+                        subtaskKey,
+                        fs,
+                        checkpointBaseDir,
+                        sharedStateDir,
+                        taskOwnedStateDir,
+                        cslReference,
+                        1024,
+                        1024,
+                        snapshotManager,
+                        0);
+
+        RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
+                operatorStateBackend.snapshot(
+                        0L,
+                        0L,
+                        streamFactory,
+                        CheckpointOptions.forCheckpointWithDefaultLocation());
+
+        SnapshotResult<OperatorStateHandle> snapshotResult =
+                FutureUtils.runIfNotDoneAndGet(snapshot);
+        OperatorStateHandle stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
+        
assertThat(stateHandle).isInstanceOf(FileMergingOperatorStreamStateHandle.class);
+    }
+
     @Test
     void testSnapshotBroadcastStateWithEmptyOperatorState() throws Exception {
         final AbstractStateBackend abstractStateBackend = new 
MemoryStateBackend(4096);
@@ -1047,4 +1115,21 @@ class OperatorStateBackendTest {
                 new OperatorStateBackendParametersImpl(
                         env, "testOperator", toRestore, new 
CloseableRegistry()));
     }
+
+    private FileMergingSnapshotManager createFileMergingSnapshotManager(
+            Path checkpointBaseDir,
+            Path sharedStateDir,
+            Path taskOwnedStateDir,
+            SubtaskKey subtaskKey) {
+        FileMergingSnapshotManager mgr =
+                new FileMergingSnapshotManagerBuilder(
+                                "test-1", 
FileMergingType.MERGE_WITHIN_CHECKPOINT)
+                        .build();
+
+        mgr.initFileSystem(
+                getSharedInstance(), checkpointBaseDir, sharedStateDir, 
taskOwnedStateDir, 1024);
+
+        mgr.registerSubtaskForSharedStates(subtaskKey);
+        return mgr;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 38553a29e9e..a26830c18be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -22,23 +22,34 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.execution.RestoreMode;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
+import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import 
org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl;
+import 
org.apache.flink.runtime.state.filemerging.EmptyFileMergingOperatorStreamStateHandle;
+import 
org.apache.flink.runtime.state.filemerging.FileMergingOperatorStreamStateHandle;
+import 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.UUID;
 
+import static org.apache.flink.core.fs.local.LocalFileSystem.getSharedInstance;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
 import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
 import static 
org.apache.flink.runtime.state.ChangelogTestUtils.ChangelogStateHandleWrapper;
@@ -255,6 +266,81 @@ class SharedStateRegistryTest {
                 .containsExactly(1L);
     }
 
+    @Test
+    void testFireMergingOperatorStateRegister(@TempDir File tmpFolder) throws 
IOException {
+        SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
+
+        Path checkpointBaseDir = new Path(tmpFolder.toString());
+        Path sharedStateDir =
+                new Path(
+                        checkpointBaseDir,
+                        
AbstractFsCheckpointStorageAccess.CHECKPOINT_SHARED_STATE_DIR);
+        Path taskOwnedStateDir =
+                new Path(
+                        checkpointBaseDir,
+                        
AbstractFsCheckpointStorageAccess.CHECKPOINT_TASK_OWNED_STATE_DIR);
+        final FileMergingSnapshotManager.SubtaskKey subtaskKey =
+                new FileMergingSnapshotManager.SubtaskKey("opId", 1, 2);
+        FileMergingSnapshotManager snapshotManager =
+                createFileMergingSnapshotManager(
+                        checkpointBaseDir, sharedStateDir, taskOwnedStateDir);
+        snapshotManager.registerSubtaskForSharedStates(subtaskKey);
+        FileMergingOperatorStreamStateHandle handle1 =
+                EmptyFileMergingOperatorStreamStateHandle.create(
+                        snapshotManager.getManagedDirStateHandle(
+                                subtaskKey, CheckpointedStateScope.EXCLUSIVE),
+                        snapshotManager.getManagedDirStateHandle(
+                                subtaskKey, CheckpointedStateScope.SHARED));
+
+        handle1.registerSharedStates(sharedStateRegistry, 1);
+        LocalFileSystem fs = getSharedInstance();
+        assertThat(
+                        fs.exists(
+                                snapshotManager.getManagedDir(
+                                        subtaskKey, 
CheckpointedStateScope.EXCLUSIVE)))
+                .isTrue();
+        assertThat(
+                        fs.exists(
+                                snapshotManager.getManagedDir(
+                                        subtaskKey, 
CheckpointedStateScope.SHARED)))
+                .isTrue();
+        sharedStateRegistry.checkpointCompleted(1);
+        sharedStateRegistry.unregisterUnusedState(1);
+        assertThat(
+                        fs.exists(
+                                snapshotManager.getManagedDir(
+                                        subtaskKey, 
CheckpointedStateScope.EXCLUSIVE)))
+                .isTrue();
+        assertThat(
+                        fs.exists(
+                                snapshotManager.getManagedDir(
+                                        subtaskKey, 
CheckpointedStateScope.SHARED)))
+                .isTrue();
+        sharedStateRegistry.unregisterUnusedState(2);
+        assertThat(
+                        fs.exists(
+                                snapshotManager.getManagedDir(
+                                        subtaskKey, 
CheckpointedStateScope.EXCLUSIVE)))
+                .isFalse();
+        assertThat(
+                        fs.exists(
+                                snapshotManager.getManagedDir(
+                                        subtaskKey, 
CheckpointedStateScope.SHARED)))
+                .isFalse();
+    }
+
+    private FileMergingSnapshotManager createFileMergingSnapshotManager(
+            Path checkpointBaseDir, Path sharedStateDir, Path 
taskOwnedStateDir) {
+        FileMergingSnapshotManager mgr =
+                new FileMergingSnapshotManagerBuilder(
+                                "test-1", 
FileMergingType.MERGE_WITHIN_CHECKPOINT)
+                        .build();
+        mgr.initFileSystem(
+                getSharedInstance(), checkpointBaseDir, sharedStateDir, 
taskOwnedStateDir, 1024);
+
+        return mgr;
+    }
+
     private void registerInitialCheckpoint(
             SharedStateRegistry sharedStateRegistry,
             String stateId,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java
index 837bbb1efbb..fba8653bdcd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java
@@ -25,9 +25,9 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.OutputStreamAndPath;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile;
-import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateOutputStream;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 import org.apache.flink.util.Preconditions;
 
 import org.junit.Before;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
index d7796cf7f4f..1eb0be9e2ab 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocationTest.java
@@ -25,9 +25,9 @@ import org.apache.flink.core.fs.local.LocalFileSystem;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManagerBuilder;
 import org.apache.flink.runtime.checkpoint.filemerging.FileMergingType;
-import org.apache.flink.runtime.checkpoint.filemerging.SegmentFileStateHandle;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
 
 import org.junit.Before;
 import org.junit.Rule;

Reply via email to