Zakelly commented on code in PR #24933:
URL: https://github.com/apache/flink/pull/24933#discussion_r1639554722


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -153,6 +152,22 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
     /** The current space statistic, updated on file creation/deletion. */
     protected SpaceStat spaceStat;
 
+    /**
+     * This map records shared state dirs which need be clean up when the 
FileMergingSnapshotManager
+     * close. The key is SubtaskKey the shared state dir belong to, and the 
value is the count of
+     * the ongoing checkpoint which reference the dir. If a checkpoint which 
reference the shared
+     * dir complete, the corresponding shared dir will be removed from this 
map, because the
+     * ownership is transferred to JobManager.
+     */
+    private final Map<SubtaskKey, Long> sharedDirToCleanRef = new 
ConcurrentHashMap<>();

Review Comment:
   I'd suggest a new class that bundle `DirectoryStreamStateHandle` and its 
reference count. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java:
##########
@@ -19,23 +19,31 @@
 package org.apache.flink.runtime.state.filemerging;
 
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.DirectoryStateHandle;
 import org.apache.flink.runtime.state.PhysicalStateHandleID;
 import org.apache.flink.runtime.state.SharedStateRegistryKey;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
 import javax.annotation.Nonnull;
 
-import java.nio.file.Path;
 import java.util.Optional;
 
 /** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */
-public class DirectoryStreamStateHandle extends DirectoryStateHandle 
implements StreamStateHandle {
+public class DirectoryStreamStateHandle implements StreamStateHandle {
 
     private static final long serialVersionUID = 1L;
 
-    public DirectoryStreamStateHandle(@Nonnull Path directory, long 
directorySize) {
-        super(directory, directorySize);
+    /** The path that describes the directory, as a string, to be 
serializable. */
+    private final String directoryString;
+
+    /** Transient path cache, to avoid re-parsing the string. */
+    private transient Path directory;

Review Comment:
   Seems unnecessary? I mean we could just hold the `Path` with `final` 
keyword, and no need for the `String directoryString`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java:
##########
@@ -80,6 +81,12 @@ public FsMergingCheckpointStorageLocation(
                                 reference,
                                 fileStateSizeThreshold,
                                 writeBufferSize);
+
+        // Record file-merging managed dir reference when 
FsMergingCheckpointStorageLocation create.
+        if (fileMergingSnapshotManager instanceof 
FileMergingSnapshotManagerBase) {
+            ((FileMergingSnapshotManagerBase) fileMergingSnapshotManager)
+                    .recordManagedDirReference(subtaskKey, checkpointId);

Review Comment:
   How about making this as an interface of `FileMergingSnapshotManager`? And 
how about a new name `retainManagedDirectory`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -496,14 +497,34 @@ protected void discardCheckpoint(long checkpointId) 
throws IOException {
     //  Checkpoint Listener
     // ------------------------------------------------------------------------
 
+    /**
+     * {@link FsMergingCheckpointStorageLocation} use this method let the file 
merging manager know
+     * an ongoing checkpoint may reference the managed dirs.
+     */
+    public void notifyCheckpointStart(SubtaskKey subtaskKey, long 
checkpointId) {
+        managedSharedStateDirHandles
+                .getOrDefault(subtaskKey, NON_HANDLE_INSTANCE)
+                .increaseRefCountWhenCheckpointStart(checkpointId);

Review Comment:
   use `computeIfPresent` ? 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java:
##########
@@ -19,23 +19,26 @@
 package org.apache.flink.runtime.state.filemerging;
 
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.DirectoryStateHandle;
 import org.apache.flink.runtime.state.PhysicalStateHandleID;
 import org.apache.flink.runtime.state.SharedStateRegistryKey;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
 import javax.annotation.Nonnull;
 
-import java.nio.file.Path;
 import java.util.Optional;
 
 /** Wrap {@link DirectoryStateHandle} to a {@link StreamStateHandle}. */

Review Comment:
   Maybe you should change this javadoc



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/DirectoryStreamStateHandle.java:
##########
@@ -85,6 +92,18 @@ public String toString() {
      * @return DirectoryStreamStateHandle with zero size.
      */
     public static DirectoryStreamStateHandle forPathWithZeroSize(@Nonnull Path 
directory) {

Review Comment:
   How about naming it `of`?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java:
##########
@@ -496,14 +497,34 @@ protected void discardCheckpoint(long checkpointId) 
throws IOException {
     //  Checkpoint Listener
     // ------------------------------------------------------------------------
 
+    /**
+     * {@link FsMergingCheckpointStorageLocation} use this method let the file 
merging manager know
+     * an ongoing checkpoint may reference the managed dirs.
+     */
+    public void notifyCheckpointStart(SubtaskKey subtaskKey, long 
checkpointId) {

Review Comment:
   I'd suggest this to be an interface of `FileMergingSnapshotManager`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to