This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 400e417823 [flink] Clean empty directory in FlinkOrphanFilesClean 
(#5521)
400e417823 is described below

commit 400e41782330c51892602574b040b9a7c90ec05e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Apr 23 20:26:13 2025 +0800

    [flink] Clean empty directory in FlinkOrphanFilesClean (#5521)
---
 .../apache/paimon/operation/OrphanFilesClean.java  |  33 ++----
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java | 131 ++++++++-------------
 .../action/RemoveOrphanFilesActionITCaseBase.java  |  19 ++-
 3 files changed, 78 insertions(+), 105 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index e768f87e59..5d7e5b6d39 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -169,35 +169,22 @@ public abstract class OrphanFilesClean implements 
Serializable {
 
     private List<Pair<Path, Long>> tryGetNonSnapshotFiles(
             Path snapshotDirectory, Predicate<FileStatus> fileStatusFilter) {
-        return listPathWithFilter(
-                fileIO, snapshotDirectory, fileStatusFilter, 
nonSnapshotFileFilter());
+        return listPathWithFilter(snapshotDirectory, fileStatusFilter, 
nonSnapshotFileFilter());
     }
 
     private List<Pair<Path, Long>> tryGetNonChangelogFiles(
             Path changelogDirectory, Predicate<FileStatus> fileStatusFilter) {
-        return listPathWithFilter(
-                fileIO, changelogDirectory, fileStatusFilter, 
nonChangelogFileFilter());
+        return listPathWithFilter(changelogDirectory, fileStatusFilter, 
nonChangelogFileFilter());
     }
 
-    private static List<Pair<Path, Long>> listPathWithFilter(
-            FileIO fileIO,
-            Path directory,
-            Predicate<FileStatus> fileStatusFilter,
-            Predicate<Path> fileFilter) {
-        try {
-            FileStatus[] statuses = fileIO.listStatus(directory);
-            if (statuses == null) {
-                return Collections.emptyList();
-            }
-
-            return Arrays.stream(statuses)
-                    .filter(fileStatusFilter)
-                    .filter(status -> fileFilter.test(status.getPath()))
-                    .map(status -> Pair.of(status.getPath(), status.getLen()))
-                    .collect(Collectors.toList());
-        } catch (IOException ignored) {
-            return Collections.emptyList();
-        }
+    private List<Pair<Path, Long>> listPathWithFilter(
+            Path directory, Predicate<FileStatus> fileStatusFilter, 
Predicate<Path> fileFilter) {
+        List<FileStatus> statuses = tryBestListingDirs(directory);
+        return statuses.stream()
+                .filter(fileStatusFilter)
+                .filter(status -> fileFilter.test(status.getPath()))
+                .map(status -> Pair.of(status.getPath(), status.getLen()))
+                .collect(Collectors.toList());
     }
 
     private static Predicate<Path> nonSnapshotFileFilter() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index e678d8f684..a98282f45d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -35,10 +35,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple7;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.ExecutionOptions;
@@ -58,7 +56,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -118,8 +115,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                                     public void processElement(
                                             String branch,
                                             ProcessFunction<String, 
Tuple2<Long, Long>>.Context ctx,
-                                            Collector<Tuple2<Long, Long>> out)
-                                            throws Exception {
+                                            Collector<Tuple2<Long, Long>> out) 
{
                                         AtomicLong deletedFilesCount = new 
AtomicLong(0);
                                         AtomicLong deletedFilesLenInBytes = 
new AtomicLong(0);
                                         cleanBranchSnapshotDir(
@@ -239,88 +235,17 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                                 });
 
         usedFiles = usedFiles.union(usedManifestFiles);
-        FileStorePathFactory pathFactory = table.store().pathFactory();
-        List<Tuple7<String, String, String, String, String, Integer, String>> 
tablePaths =
-                Arrays.asList(
-                        new Tuple7<>(
-                                table.fullName(),
-                                pathFactory.manifestPath().toString(),
-                                pathFactory.indexPath().toString(),
-                                pathFactory.statisticsPath().toString(),
-                                pathFactory.dataFilePath().toString(),
-                                partitionKeysNum,
-                                
table.store().options().dataFileExternalPaths()));
         DataStream<Tuple2<String, Long>> candidates =
-                env.fromCollection(
-                                tablePaths,
-                                TypeInformation.of(
-                                        new TypeHint<
-                                                Tuple7<
-                                                        String,
-                                                        String,
-                                                        String,
-                                                        String,
-                                                        String,
-                                                        Integer,
-                                                        String>>() {}))
+                env.fromCollection(Collections.singletonList(1), 
TypeInformation.of(Integer.class))
                         .process(
-                                new ProcessFunction<
-                                        Tuple7<
-                                                String,
-                                                String,
-                                                String,
-                                                String,
-                                                String,
-                                                Integer,
-                                                String>,
-                                        Tuple2<String, Long>>() {
+                                new ProcessFunction<Integer, Tuple2<String, 
Long>>() {
                                     @Override
                                     public void processElement(
-                                            Tuple7<
-                                                            String,
-                                                            String,
-                                                            String,
-                                                            String,
-                                                            String,
-                                                            Integer,
-                                                            String>
-                                                    paths,
-                                            ProcessFunction<
-                                                                    Tuple7<
-                                                                            
String,
-                                                                            
String,
-                                                                            
String,
-                                                                            
String,
-                                                                            
String,
-                                                                            
Integer,
-                                                                            
String>,
-                                                                    
Tuple2<String, Long>>
-                                                            .Context
+                                            Integer i,
+                                            ProcessFunction<Integer, 
Tuple2<String, Long>>.Context
                                                     ctx,
                                             Collector<Tuple2<String, Long>> 
out) {
-                                        List<String> dirs =
-                                                listPaimonFileDirs(
-                                                                paths.f0, 
paths.f1, paths.f2,
-                                                                paths.f3, 
paths.f4, paths.f5,
-                                                                paths.f6)
-                                                        .stream()
-                                                        .map(Path::toUri)
-                                                        .map(Object::toString)
-                                                        
.collect(Collectors.toList());
-                                        for (String dir : dirs) {
-                                            for (FileStatus fileStatus :
-                                                    tryBestListingDirs(new 
Path(dir))) {
-                                                if (oldEnough(fileStatus)) {
-                                                    out.collect(
-                                                            new Tuple2(
-                                                                    fileStatus
-                                                                            
.getPath()
-                                                                            
.toUri()
-                                                                            
.toString(),
-                                                                    
fileStatus.getLen()));
-                                                }
-                                            }
-                                        }
+                                        listPaimonFilesForTable(out);
                                     }
                                 })
                         .setParallelism(1);
@@ -398,6 +323,50 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
         return deleted;
     }
 
+    private void listPaimonFilesForTable(Collector<Tuple2<String, Long>> out) {
+        FileStorePathFactory pathFactory = table.store().pathFactory();
+        List<String> dirs =
+                listPaimonFileDirs(
+                                table.fullName(),
+                                pathFactory.manifestPath().toString(),
+                                pathFactory.indexPath().toString(),
+                                pathFactory.statisticsPath().toString(),
+                                pathFactory.dataFilePath().toString(),
+                                partitionKeysNum,
+                                table.coreOptions().dataFileExternalPaths())
+                        .stream()
+                        .map(Path::toUri)
+                        .map(Object::toString)
+                        .collect(Collectors.toList());
+        Set<Path> emptyDirs = new HashSet<>();
+        for (String dir : dirs) {
+            Path dirPath = new Path(dir);
+            List<FileStatus> files = tryBestListingDirs(dirPath);
+            for (FileStatus file : files) {
+                if (oldEnough(file)) {
+                    out.collect(new 
Tuple2<>(file.getPath().toUri().toString(), file.getLen()));
+                }
+            }
+            if (files.isEmpty()) {
+                emptyDirs.add(dirPath);
+            }
+        }
+
+        // delete empty dir
+        while (!emptyDirs.isEmpty()) {
+            Set<Path> newEmptyDir = new HashSet<>();
+            for (Path emptyDir : emptyDirs) {
+                try {
+                    fileIO.delete(emptyDir, false);
+                    // recursive cleaning
+                    newEmptyDir.add(emptyDir.getParent());
+                } catch (IOException ignored) {
+                }
+            }
+            emptyDirs = newEmptyDir;
+        }
+    }
+
     public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
             StreamExecutionEnvironment env,
             Catalog catalog,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
index 54e211ed3b..50fbd7dac1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
@@ -103,7 +103,7 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
     public void testRunWithoutException(boolean isNamedArgument) throws 
Exception {
         assumeTrue(!isNamedArgument || supportNamedArgument());
 
-        createTableAndWriteData(tableName);
+        FileStoreTable table = createTableAndWriteData(tableName);
 
         List<String> args =
                 new ArrayList<>(
@@ -158,6 +158,23 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
 
         assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), 
Row.of("2"));
+
+        // test clean empty directories
+        FileIO fileIO = table.fileIO();
+        Path location = table.location();
+        Path bucketDir = new Path(location, "bucket-0");
+
+        // delete snapshots and clean orphan files
+        fileIO.delete(new Path(location, "snapshot"), true);
+        ImmutableList.copyOf(executeSQL(withOlderThan));
+        assertThat(fileIO.exists(bucketDir)).isTrue();
+        assertThat(fileIO.listDirectories(bucketDir)).isEmpty();
+
+        // clean empty directories
+        ImmutableList.copyOf(executeSQL(withOlderThan));
+        assertThat(fileIO.exists(bucketDir)).isFalse();
+        // table should not be deleted
+        assertThat(fileIO.exists(location)).isTrue();
     }
 
     @ParameterizedTest

Reply via email to