AlexYinHan commented on code in PR #27042:
URL: https://github.com/apache/flink/pull/27042#discussion_r2425154334


##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/ForStStateBackend.java:
##########
@@ -795,45 +794,53 @@ private ReadableConfig 
mergeConfigurableOptions(ReadableConfig base, ReadableCon
         return configuration;
     }
 
-    Tuple2<Path, Path> getForStBasePath(String operatorIdentifier, Environment 
env) {
+    ForStPathContainer createForStPathContainer(
+            String operatorIdentifier, Environment env, boolean forceLocal) {
         String opChildPath =
                 String.format(
                         "op_%s_attempt_%s",
                         operatorIdentifier, 
env.getTaskInfo().getAttemptNumber());
 
-        Path localBasePath =
-                new Path(
-                        new File(new File(getNextStoragePath(), 
jobId.toHexString()), opChildPath)
-                                .getAbsolutePath());
+        File localJobFile = new File(getNextStoragePath(), 
jobId.toHexString());
+        Path localJobPath = new Path(localJobFile.getPath());
+        Path localBasePath = new Path(new File(localJobFile, 
opChildPath).getAbsolutePath());
+
+        if (forceLocal) {
+            return ForStPathContainer.ofLocal(localJobPath, localBasePath);
+        }
+
+        Path remoteJobPath = null;
         Path remoteBasePath = null;
         if (remoteForStDirectory != null) {
-            remoteBasePath =
-                    new Path(new Path(remoteForStDirectory, 
jobId.toHexString()), opChildPath);
+            remoteJobPath = new Path(remoteForStDirectory, 
jobId.toHexString());
+            remoteBasePath = new Path(remoteJobPath, opChildPath);
         } else if (remoteShareWithCheckpoint) {
             if (env.getCheckpointStorageAccess() instanceof 
FsCheckpointStorageAccess) {
-                Path sharedStateDirectory =
-                        ((FsCheckpointStorageAccess) 
env.getCheckpointStorageAccess())
-                                .getSharedStateDirectory();
+                FsCheckpointStorageAccess fsCheckpointStorageAccess =
+                        (FsCheckpointStorageAccess) 
env.getCheckpointStorageAccess();
+                remoteJobPath = 
fsCheckpointStorageAccess.getCheckpointsDirectory();

Review Comment:
   ```remoteJobPath``` is the parent directory that ends with 'JobID'.
   
   For example:
   - ```remoteShareWithCheckpoint==True```: If the ForSt base DIR is 
```/checkpoints/jobid-xxx/shared/op_yyy__1_1__attempt_0```,  then 
```remoteJobPath``` is ```/checkpoints/jobid-xxx```.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to