This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7b62ef6e3c [flink] Add descriptive operator names and enable parallel
file listing in orphan files cleanup (#6958)
7b62ef6e3c is described below
commit 7b62ef6e3c29ce1170320f50574612f60195c325
Author: zhoulii <[email protected]>
AuthorDate: Mon Jan 12 21:14:10 2026 +0800
[flink] Add descriptive operator names and enable parallel file listing in
orphan files cleanup (#6958)
---
.../paimon/flink/orphan/FlinkOrphanFilesClean.java | 152 +++++++++++++--------
1 file changed, 98 insertions(+), 54 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index a98282f45d..bbf1c2170e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CloseableIterator;
@@ -64,7 +65,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
import static org.apache.flink.util.Preconditions.checkState;
@@ -109,6 +109,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
// here, and subsequently, we will not count their orphan files.
DataStream<CleanOrphanFilesResult> branchSnapshotDirDeleted =
env.fromCollection(branches)
+ .name("branch-source")
.process(
new ProcessFunction<String, Tuple2<Long,
Long>>() {
@Override
@@ -128,6 +129,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
deletedFilesLenInBytes.get()));
}
})
+ .name("clean-branch-snapshot")
.keyBy(tuple -> 1)
.reduce(
(ReduceFunction<Tuple2<Long, Long>>)
@@ -136,7 +138,9 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
value1.f0 + value2.f0,
value1.f1 + value2.f1))
.setParallelism(1)
- .map(tuple -> new CleanOrphanFilesResult(tuple.f0,
tuple.f1));
+ .name("aggregate-branch-snapshot-deletion")
+ .map(tuple -> new CleanOrphanFilesResult(tuple.f0,
tuple.f1))
+ .name("branch-snapshot-deletion-result");
// branch and manifest file
final OutputTag<Tuple2<String, String>> manifestOutputTag =
@@ -144,6 +148,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
SingleOutputStreamOperator<String> usedManifestFiles =
env.fromCollection(branches)
+ .name("branch-source")
.process(
new ProcessFunction<String, Tuple2<String,
String>>() {
@Override
@@ -158,6 +163,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
}
}
})
+ .name("collect-snapshots")
.rebalance()
.process(
new ProcessFunction<Tuple2<String, String>,
String>() {
@@ -180,14 +186,15 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
collectWithoutDataFile(
branch, snapshot,
out::collect, manifestConsumer);
}
- });
+ })
+ .name("collect-manifests");
DataStream<String> usedFiles =
usedManifestFiles
.getSideOutput(manifestOutputTag)
.keyBy(tuple2 -> tuple2.f0 + ":" + tuple2.f1)
.transform(
- "datafile-reader",
+ "collect-used-files",
STRING_TYPE_INFO,
new BoundedOneInputOperator<Tuple2<String,
String>, String>() {
@@ -235,20 +242,101 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
});
usedFiles = usedFiles.union(usedManifestFiles);
- DataStream<Tuple2<String, Long>> candidates =
+
+ final OutputTag<Path> emptyDirOutputTag = new
OutputTag<Path>("empty-dir-output") {};
+ SingleOutputStreamOperator<Tuple2<String, Long>> candidates =
env.fromCollection(Collections.singletonList(1),
TypeInformation.of(Integer.class))
.process(
- new ProcessFunction<Integer, Tuple2<String,
Long>>() {
+ new ProcessFunction<Integer, String>() {
@Override
public void processElement(
Integer i,
- ProcessFunction<Integer,
Tuple2<String, Long>>.Context
+ ProcessFunction<Integer,
String>.Context ctx,
+ Collector<String> out) {
+ FileStorePathFactory pathFactory =
+ table.store().pathFactory();
+ listPaimonFileDirs(
+ table.fullName(),
+
pathFactory.manifestPath().toString(),
+
pathFactory.indexPath().toString(),
+
pathFactory.statisticsPath().toString(),
+
pathFactory.dataFilePath().toString(),
+ partitionKeysNum,
+
table.coreOptions().dataFileExternalPaths())
+ .stream()
+ .map(Path::toUri)
+ .map(Object::toString)
+ .forEach(out::collect);
+ }
+ })
+ .name("list-dirs")
+ .forceNonParallel()
+ .process(
+ new ProcessFunction<String, Tuple2<String,
Long>>() {
+ @Override
+ public void processElement(
+ String dir,
+ ProcessFunction<String,
Tuple2<String, Long>>.Context
ctx,
Collector<Tuple2<String, Long>>
out) {
- listPaimonFilesForTable(out);
+ Path dirPath = new Path(dir);
+ List<FileStatus> files =
tryBestListingDirs(dirPath);
+ for (FileStatus file : files) {
+ if (oldEnough(file)) {
+ out.collect(
+ Tuple2.of(
+
file.getPath().toUri().toString(),
+
file.getLen()));
+ }
+ }
+ if (files.isEmpty()) {
+ ctx.output(emptyDirOutputTag,
dirPath);
+ }
}
})
- .setParallelism(1);
+ .name("collect-candidate-files");
+
+ candidates
+ .getSideOutput(emptyDirOutputTag)
+ .transform(
+ "clean-empty-dirs",
+ STRING_TYPE_INFO,
+ new BoundedOneInputOperator<Path, String>() {
+
+ private Set<Path> emptyDirs = new HashSet<>();
+
+ @Override
+ public void processElement(StreamRecord<Path>
element) {
+ emptyDirs.add(element.getValue());
+ }
+
+ @Override
+ public void endInput() throws IOException {
+ // delete empty dir
+ while (!emptyDirs.isEmpty()) {
+ Set<Path> newEmptyDir = new HashSet<>();
+ for (Path emptyDir : emptyDirs) {
+ try {
+ if (fileIO.delete(emptyDir,
false)) {
+ LOG.info("Clean empty dir:
{}", emptyDir);
+ output.collect(
+ new
StreamRecord<>(emptyDir.toString()));
+ // recursive cleaning
+
newEmptyDir.add(emptyDir.getParent());
+ }
+ } catch (IOException ignored) {
+ LOG.warn("Clean empty dir failed:
{}", emptyDir);
+ }
+ }
+ emptyDirs = newEmptyDir;
+ }
+ }
+ })
+ .forceNonParallel()
+ .sinkTo(new DiscardingSink<>())
+ .name("end")
+ .setParallelism(1)
+ .setMaxParallelism(1);
DataStream<CleanOrphanFilesResult> deleted =
usedFiles
@@ -256,7 +344,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
.connect(
candidates.keyBy(pathAndSize -> new
Path(pathAndSize.f0).getName()))
.transform(
- "files_join",
+ "join-used-and-candidate-files",
TypeInformation.of(CleanOrphanFilesResult.class),
new BoundedTwoInputOperator<
String, Tuple2<String, Long>,
CleanOrphanFilesResult>() {
@@ -323,50 +411,6 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
return deleted;
}
- private void listPaimonFilesForTable(Collector<Tuple2<String, Long>> out) {
- FileStorePathFactory pathFactory = table.store().pathFactory();
- List<String> dirs =
- listPaimonFileDirs(
- table.fullName(),
- pathFactory.manifestPath().toString(),
- pathFactory.indexPath().toString(),
- pathFactory.statisticsPath().toString(),
- pathFactory.dataFilePath().toString(),
- partitionKeysNum,
- table.coreOptions().dataFileExternalPaths())
- .stream()
- .map(Path::toUri)
- .map(Object::toString)
- .collect(Collectors.toList());
- Set<Path> emptyDirs = new HashSet<>();
- for (String dir : dirs) {
- Path dirPath = new Path(dir);
- List<FileStatus> files = tryBestListingDirs(dirPath);
- for (FileStatus file : files) {
- if (oldEnough(file)) {
- out.collect(new
Tuple2<>(file.getPath().toUri().toString(), file.getLen()));
- }
- }
- if (files.isEmpty()) {
- emptyDirs.add(dirPath);
- }
- }
-
- // delete empty dir
- while (!emptyDirs.isEmpty()) {
- Set<Path> newEmptyDir = new HashSet<>();
- for (Path emptyDir : emptyDirs) {
- try {
- fileIO.delete(emptyDir, false);
- // recursive cleaning
- newEmptyDir.add(emptyDir.getParent());
- } catch (IOException ignored) {
- }
- }
- emptyDirs = newEmptyDir;
- }
- }
-
public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
StreamExecutionEnvironment env,
Catalog catalog,