XComp commented on a change in pull request #18979:
URL: https://github.com/apache/flink/pull/18979#discussion_r819627740



##########
File path: flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
##########
@@ -671,16 +672,111 @@ public boolean exists(final Path f) throws IOException {
     }
 
     /**
-     * Delete a file.
+     * Deletes a file or directory.
      *
-     * @param f the path to delete
-     * @param recursive if path is a directory and set to <code>true</code>, 
the directory is
-     *     deleted else throws an exception. In case of a file the recursive 
can be set to either
-     *     <code>true</code> or <code>false</code>
-     * @return <code>true</code> if delete is successful, <code>false</code> 
otherwise
-     * @throws IOException
+     * @param path The path to the file that shall be deleted.
+     * @param recursive {@code true} will trigger a recursive deletion; {@code 
false} will only try
+     *     to delete the object referenced by the passed {@code path}. In the 
latter case, a {@code
+     *     DirectoryNotEmptyException} is thrown if the {@code path} refers to 
a non-empty
+     *     directory.
+     * @return {@code true} if the file or directory doesn't exist after this 
call was processed.
+     * @throws java.nio.file.DirectoryNotEmptyException if the {@code path} 
refers to a non-empty
+     *     directory.
+     * @throws IOException If an error occurred while deleting the data 
indicating that the deletion
+     *     wasn't successful.
+     */
+    public boolean delete(Path path, boolean recursive) throws IOException {
+        if (recursive) {
+            deleteRecursively(path);
+        } else {
+            deleteFile(path);
+        }
+
+        return true;
+    }
+
+    private void deleteRecursively(Path path) throws IOException {
+        if (!exists(path)) {
+            return;
+        }
+
+        final FileStatus[] containingFiles = listStatus(path);
+        if (containingFiles == null) {
+            LOG.warn(
+                    "No files could be retrieved even though the path was 
marked as existing. This is an indication for a bug in the underlying 
FileSystem implementation and should be reported. It won't affect the 
processing of this run, though.");
+            return;
+        }
+
+        IOException exception = null;
+        for (FileStatus fileStatus : containingFiles) {
+            final Path childPath = fileStatus.getPath();
+
+            if (childPath.equals(path)) {
+                // we need to make sure that we delete the path itself after 
all its content is
+                // deleted
+                continue;
+            }
+
+            try {
+                deleteRecursively(childPath);
+            } catch (IOException e) {
+                exception = ExceptionUtils.firstOrSuppressed(e, exception);
+            }
+        }
+
+        if (exception == null) {
+            deleteFile(path);
+        } else {
+            throw exception;
+        }
+    }
+
+    /**
+     * Deletes the object referenced by the passed {@code path}.
+     *
+     * @param path The path referring to the object that shall be deleted.
+     * @throws java.nio.file.DirectoryNotEmptyException if the {@code path} 
refers to a non-empty
+     *     directory.
+     * @throws IOException if an error occurred while deleting the file other 
than the {@code path}
+     *     referring to a non-empty directory.
+     */
+    private void deleteFile(Path path) throws IOException {
+        if (isDirectoryWithContent(path)) {
+            throw new DirectoryNotEmptyException(path.getPath());
+        }
+
+        try {
+            tryToDelete(path);
+        } catch (IOException e) {
+            if (exists(path)) {
+                throw e;
+            }
+            return;
+        }
+
+        if (exists(path)) {
+            throw new IOException(path.getPath() + " could not be deleted for 
unknown reasons.");
+        }
+    }
+
+    /**
+     * Checks whether a given {@code path} refers to a non-empty directory.
+     *
+     * @param path The path referring to the directory in question.
+     * @return {@code true}, if the {@code path} refers to a non-empty 
directory; otherwise {@code
+     *     false}.
+     * @throws IOException If an error occurred while accessing the underlying 
file system.
+     */
+    protected abstract boolean isDirectoryWithContent(Path path) throws 
IOException;
+
+    /**
+     * Deletes the file or empty directory referenced by the given {@code 
path}. Shouldn't do
+     * anything if the object doesn't exist.
+     *
+     * @param path The path to the object that is subject for deletion.
+     * @throws IOException If an error occurs while accessing the underlying 
file system.
      */
-    public abstract boolean delete(Path f, boolean recursive) throws 
IOException;

Review comment:
       ...the JavaDoc was made more specific, though, which could be considered 
a change in contract.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to