Zakelly commented on code in PR #27185:
URL: https://github.com/apache/flink/pull/27185#discussion_r2488527067
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/StringifiedForStFileSystem.java:
##########
@@ -30,10 +30,10 @@
*/
public class StringifiedForStFileSystem {
- private ForStFlinkFileSystem fileSystem;
+ private ForStFileSystemDecorator fileSystem;
public StringifiedForStFileSystem(ForStFlinkFileSystem fileSystem) {
- this.fileSystem = fileSystem;
+ this.fileSystem = new ForStFileSystemDecorator(fileSystem);
Review Comment:
emm..... why this?
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFileSystemUtils.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** Utils for ForStFileSystem. */
+public class ForStFileSystemUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(ForStFileSystemUtils.class);
+
+ private static final String DUMMY_DIR_NAME = "_dummy_dir";
+
+ public static boolean isParentDir(@Nullable Path path, String dir) {
+ if (path == null) {
+ return false;
+ }
+ return isParentDir(path.toString(), dir);
+ }
+
+ public static boolean isParentDir(String path, String dir) {
+ if (dir.isEmpty()) {
+ return false;
+ }
+ if (dir.charAt(dir.length() - 1) == '/') {
+ return path.startsWith(dir);
+ } else {
+ return (path.startsWith(dir + "/"));
+ }
+ }
+
+ public static ForStFlinkFileSystem tryDecorate(ForStFlinkFileSystem
fileSystem)
+ throws IOException {
+ try {
+ return isDummyMkdirEnabled(fileSystem)
+ ? new ForStFileSystemDecorator(fileSystem)
+ : fileSystem;
+ } catch (IOException e) {
+ LOG.info("Cannot decorate ForStFlinkFileSystem", e);
+ }
+ return fileSystem;
+ }
+
+ private static boolean isDummyMkdirEnabled(ForStFlinkFileSystem
fileSystem) throws IOException {
+ // check if the underlying FileSystem uses a dummy mkdir implementation
+ Path dummyDir = new Path(fileSystem.getRemoteBase(), DUMMY_DIR_NAME);
Review Comment:
UUID should be added to the probe path, to avoid concurrency issue
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFileSystemDecorator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/** A wrapper of {@link ForStFlinkFileSystem} to adapt ForSt to various
FileSystems. */
+public class ForStFileSystemDecorator extends ForStFlinkFileSystem {
+ private static final Logger LOG =
LoggerFactory.getLogger(ForStFileSystemDecorator.class);
+
+ /**
+ * Whether the underlying FileSystem uses a dummy mkdir implementation,
meaning it does not
+ * actually create the directory when mkdir() completes. We determine this
condition during the
+ * first call to mkdir() that returns true. When this occurs, we record
the path of the
+ * should-be-created directory in dummyCreatedDirPaths and subsequently
return true for
+ * existence checks.
+ */
+ private final Set<Path> dummyCreatedDirPaths = new HashSet<>();
Review Comment:
Please move this part to the javadoc of this. And I thought it's no need for
'dummy' in the variable name.
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFileSystemDecorator.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/** A wrapper of {@link ForStFlinkFileSystem} to adapt ForSt to various
FileSystems. */
+public class ForStFileSystemDecorator extends ForStFlinkFileSystem {
+ private static final Logger LOG =
LoggerFactory.getLogger(ForStFileSystemDecorator.class);
+
+ /**
+ * Whether the underlying FileSystem uses a dummy mkdir implementation,
meaning it does not
+ * actually create the directory when mkdir() completes. We determine this
condition during the
+ * first call to mkdir() that returns true. When this occurs, we record
the path of the
+ * should-be-created directory in dummyCreatedDirPaths and subsequently
return true for
+ * existence checks.
+ */
+ private final Set<Path> dummyCreatedDirPaths = new HashSet<>();
+
+ private static final String DUMMY_DIR_NAME = "_dummy_dir";
Review Comment:
seems unused?
##########
flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/ForStFileSystemUtils.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.state.forst.fs;
+
+import org.apache.flink.core.fs.Path;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+/** Utils for ForStFileSystem. */
+public class ForStFileSystemUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(ForStFileSystemUtils.class);
+
+ private static final String DUMMY_DIR_NAME = "_dummy_dir";
+
+ public static boolean isParentDir(@Nullable Path path, String dir) {
+ if (path == null) {
+ return false;
+ }
+ return isParentDir(path.toString(), dir);
+ }
+
+ public static boolean isParentDir(String path, String dir) {
+ if (dir.isEmpty()) {
+ return false;
+ }
+ if (dir.charAt(dir.length() - 1) == '/') {
+ return path.startsWith(dir);
+ } else {
+ return (path.startsWith(dir + "/"));
+ }
+ }
+
+ public static ForStFlinkFileSystem tryDecorate(ForStFlinkFileSystem
fileSystem)
+ throws IOException {
+ try {
+ return isDummyMkdirEnabled(fileSystem)
+ ? new ForStFileSystemDecorator(fileSystem)
+ : fileSystem;
+ } catch (IOException e) {
+ LOG.info("Cannot decorate ForStFlinkFileSystem", e);
+ }
+ return fileSystem;
+ }
+
+ private static boolean isDummyMkdirEnabled(ForStFlinkFileSystem
fileSystem) throws IOException {
Review Comment:
Well, the 'dummy' seems not clear. How about using 'Incomplete'?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]