This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 400e417823 [flink] Clean empty directory in FlinkOrphanFilesClean
(#5521)
400e417823 is described below
commit 400e41782330c51892602574b040b9a7c90ec05e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Apr 23 20:26:13 2025 +0800
[flink] Clean empty directory in FlinkOrphanFilesClean (#5521)
---
.../apache/paimon/operation/OrphanFilesClean.java | 33 ++----
.../paimon/flink/orphan/FlinkOrphanFilesClean.java | 131 ++++++++-------------
.../action/RemoveOrphanFilesActionITCaseBase.java | 19 ++-
3 files changed, 78 insertions(+), 105 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index e768f87e59..5d7e5b6d39 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -169,35 +169,22 @@ public abstract class OrphanFilesClean implements
Serializable {
private List<Pair<Path, Long>> tryGetNonSnapshotFiles(
Path snapshotDirectory, Predicate<FileStatus> fileStatusFilter) {
- return listPathWithFilter(
- fileIO, snapshotDirectory, fileStatusFilter,
nonSnapshotFileFilter());
+ return listPathWithFilter(snapshotDirectory, fileStatusFilter,
nonSnapshotFileFilter());
}
private List<Pair<Path, Long>> tryGetNonChangelogFiles(
Path changelogDirectory, Predicate<FileStatus> fileStatusFilter) {
- return listPathWithFilter(
- fileIO, changelogDirectory, fileStatusFilter,
nonChangelogFileFilter());
+ return listPathWithFilter(changelogDirectory, fileStatusFilter,
nonChangelogFileFilter());
}
- private static List<Pair<Path, Long>> listPathWithFilter(
- FileIO fileIO,
- Path directory,
- Predicate<FileStatus> fileStatusFilter,
- Predicate<Path> fileFilter) {
- try {
- FileStatus[] statuses = fileIO.listStatus(directory);
- if (statuses == null) {
- return Collections.emptyList();
- }
-
- return Arrays.stream(statuses)
- .filter(fileStatusFilter)
- .filter(status -> fileFilter.test(status.getPath()))
- .map(status -> Pair.of(status.getPath(), status.getLen()))
- .collect(Collectors.toList());
- } catch (IOException ignored) {
- return Collections.emptyList();
- }
+ private List<Pair<Path, Long>> listPathWithFilter(
+ Path directory, Predicate<FileStatus> fileStatusFilter,
Predicate<Path> fileFilter) {
+ List<FileStatus> statuses = tryBestListingDirs(directory);
+ return statuses.stream()
+ .filter(fileStatusFilter)
+ .filter(status -> fileFilter.test(status.getPath()))
+ .map(status -> Pair.of(status.getPath(), status.getLen()))
+ .collect(Collectors.toList());
}
private static Predicate<Path> nonSnapshotFileFilter() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index e678d8f684..a98282f45d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -35,10 +35,8 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple7;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
@@ -58,7 +56,6 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -118,8 +115,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
public void processElement(
String branch,
ProcessFunction<String,
Tuple2<Long, Long>>.Context ctx,
- Collector<Tuple2<Long, Long>> out)
- throws Exception {
+ Collector<Tuple2<Long, Long>> out)
{
AtomicLong deletedFilesCount = new
AtomicLong(0);
AtomicLong deletedFilesLenInBytes =
new AtomicLong(0);
cleanBranchSnapshotDir(
@@ -239,88 +235,17 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
});
usedFiles = usedFiles.union(usedManifestFiles);
- FileStorePathFactory pathFactory = table.store().pathFactory();
- List<Tuple7<String, String, String, String, String, Integer, String>>
tablePaths =
- Arrays.asList(
- new Tuple7<>(
- table.fullName(),
- pathFactory.manifestPath().toString(),
- pathFactory.indexPath().toString(),
- pathFactory.statisticsPath().toString(),
- pathFactory.dataFilePath().toString(),
- partitionKeysNum,
-
table.store().options().dataFileExternalPaths()));
DataStream<Tuple2<String, Long>> candidates =
- env.fromCollection(
- tablePaths,
- TypeInformation.of(
- new TypeHint<
- Tuple7<
- String,
- String,
- String,
- String,
- String,
- Integer,
- String>>() {}))
+ env.fromCollection(Collections.singletonList(1),
TypeInformation.of(Integer.class))
.process(
- new ProcessFunction<
- Tuple7<
- String,
- String,
- String,
- String,
- String,
- Integer,
- String>,
- Tuple2<String, Long>>() {
+ new ProcessFunction<Integer, Tuple2<String,
Long>>() {
@Override
public void processElement(
- Tuple7<
- String,
- String,
- String,
- String,
- String,
- Integer,
- String>
- paths,
- ProcessFunction<
- Tuple7<
-
String,
-
String,
-
String,
-
String,
-
String,
-
Integer,
-
String>,
-
Tuple2<String, Long>>
- .Context
+ Integer i,
+ ProcessFunction<Integer,
Tuple2<String, Long>>.Context
ctx,
Collector<Tuple2<String, Long>>
out) {
- List<String> dirs =
- listPaimonFileDirs(
- paths.f0,
paths.f1, paths.f2,
- paths.f3,
paths.f4, paths.f5,
- paths.f6)
- .stream()
- .map(Path::toUri)
- .map(Object::toString)
-
.collect(Collectors.toList());
- for (String dir : dirs) {
- for (FileStatus fileStatus :
- tryBestListingDirs(new
Path(dir))) {
- if (oldEnough(fileStatus)) {
- out.collect(
- new Tuple2(
- fileStatus
-
.getPath()
-
.toUri()
-
.toString(),
-
fileStatus.getLen()));
- }
- }
- }
+ listPaimonFilesForTable(out);
}
})
.setParallelism(1);
@@ -398,6 +323,50 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
return deleted;
}
+ private void listPaimonFilesForTable(Collector<Tuple2<String, Long>> out) {
+ FileStorePathFactory pathFactory = table.store().pathFactory();
+ List<String> dirs =
+ listPaimonFileDirs(
+ table.fullName(),
+ pathFactory.manifestPath().toString(),
+ pathFactory.indexPath().toString(),
+ pathFactory.statisticsPath().toString(),
+ pathFactory.dataFilePath().toString(),
+ partitionKeysNum,
+ table.coreOptions().dataFileExternalPaths())
+ .stream()
+ .map(Path::toUri)
+ .map(Object::toString)
+ .collect(Collectors.toList());
+ Set<Path> emptyDirs = new HashSet<>();
+ for (String dir : dirs) {
+ Path dirPath = new Path(dir);
+ List<FileStatus> files = tryBestListingDirs(dirPath);
+ for (FileStatus file : files) {
+ if (oldEnough(file)) {
+ out.collect(new
Tuple2<>(file.getPath().toUri().toString(), file.getLen()));
+ }
+ }
+ if (files.isEmpty()) {
+ emptyDirs.add(dirPath);
+ }
+ }
+
+ // delete empty dir
+ while (!emptyDirs.isEmpty()) {
+ Set<Path> newEmptyDir = new HashSet<>();
+ for (Path emptyDir : emptyDirs) {
+ try {
+ fileIO.delete(emptyDir, false);
+ // recursive cleaning
+ newEmptyDir.add(emptyDir.getParent());
+ } catch (IOException ignored) {
+ }
+ }
+ emptyDirs = newEmptyDir;
+ }
+ }
+
public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
StreamExecutionEnvironment env,
Catalog catalog,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
index 54e211ed3b..50fbd7dac1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
@@ -103,7 +103,7 @@ public abstract class RemoveOrphanFilesActionITCaseBase
extends ActionITCaseBase
public void testRunWithoutException(boolean isNamedArgument) throws
Exception {
assumeTrue(!isNamedArgument || supportNamedArgument());
- createTableAndWriteData(tableName);
+ FileStoreTable table = createTableAndWriteData(tableName);
List<String> args =
new ArrayList<>(
@@ -158,6 +158,23 @@ public abstract class RemoveOrphanFilesActionITCaseBase
extends ActionITCaseBase
ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(executeSQL(withOlderThan));
assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"),
Row.of("2"));
+
+ // test clean empty directories
+ FileIO fileIO = table.fileIO();
+ Path location = table.location();
+ Path bucketDir = new Path(location, "bucket-0");
+
+ // delete snapshots and clean orphan files
+ fileIO.delete(new Path(location, "snapshot"), true);
+ ImmutableList.copyOf(executeSQL(withOlderThan));
+ assertThat(fileIO.exists(bucketDir)).isTrue();
+ assertThat(fileIO.listDirectories(bucketDir)).isEmpty();
+
+ // clean empty directories
+ ImmutableList.copyOf(executeSQL(withOlderThan));
+ assertThat(fileIO.exists(bucketDir)).isFalse();
+ // table should not be deleted
+ assertThat(fileIO.exists(location)).isTrue();
}
@ParameterizedTest