This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b62ef6e3c [flink] Add descriptive operator names and enable parallel 
file listing in orphan files cleanup (#6958)
7b62ef6e3c is described below

commit 7b62ef6e3c29ce1170320f50574612f60195c325
Author: zhoulii <[email protected]>
AuthorDate: Mon Jan 12 21:14:10 2026 +0800

    [flink] Add descriptive operator names and enable parallel file listing in 
orphan files cleanup (#6958)
---
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java | 152 +++++++++++++--------
 1 file changed, 98 insertions(+), 54 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index a98282f45d..bbf1c2170e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
 import org.apache.flink.streaming.api.operators.InputSelection;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.CloseableIterator;
@@ -64,7 +65,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -109,6 +109,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
         // here, and subsequently, we will not count their orphan files.
         DataStream<CleanOrphanFilesResult> branchSnapshotDirDeleted =
                 env.fromCollection(branches)
+                        .name("branch-source")
                         .process(
                                 new ProcessFunction<String, Tuple2<Long, 
Long>>() {
                                     @Override
@@ -128,6 +129,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                                                         
deletedFilesLenInBytes.get()));
                                     }
                                 })
+                        .name("clean-branch-snapshot")
                         .keyBy(tuple -> 1)
                         .reduce(
                                 (ReduceFunction<Tuple2<Long, Long>>)
@@ -136,7 +138,9 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                                                         value1.f0 + value2.f0,
                                                         value1.f1 + value2.f1))
                         .setParallelism(1)
-                        .map(tuple -> new CleanOrphanFilesResult(tuple.f0, 
tuple.f1));
+                        .name("aggregate-branch-snapshot-deletion")
+                        .map(tuple -> new CleanOrphanFilesResult(tuple.f0, 
tuple.f1))
+                        .name("branch-snapshot-deletion-result");
 
         // branch and manifest file
         final OutputTag<Tuple2<String, String>> manifestOutputTag =
@@ -144,6 +148,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
 
         SingleOutputStreamOperator<String> usedManifestFiles =
                 env.fromCollection(branches)
+                        .name("branch-source")
                         .process(
                                 new ProcessFunction<String, Tuple2<String, 
String>>() {
                                     @Override
@@ -158,6 +163,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                                         }
                                     }
                                 })
+                        .name("collect-snapshots")
                         .rebalance()
                         .process(
                                 new ProcessFunction<Tuple2<String, String>, 
String>() {
@@ -180,14 +186,15 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                                         collectWithoutDataFile(
                                                 branch, snapshot, 
out::collect, manifestConsumer);
                                     }
-                                });
+                                })
+                        .name("collect-manifests");
 
         DataStream<String> usedFiles =
                 usedManifestFiles
                         .getSideOutput(manifestOutputTag)
                         .keyBy(tuple2 -> tuple2.f0 + ":" + tuple2.f1)
                         .transform(
-                                "datafile-reader",
+                                "collect-used-files",
                                 STRING_TYPE_INFO,
                                 new BoundedOneInputOperator<Tuple2<String, 
String>, String>() {
 
@@ -235,20 +242,101 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                                 });
 
         usedFiles = usedFiles.union(usedManifestFiles);
-        DataStream<Tuple2<String, Long>> candidates =
+
+        final OutputTag<Path> emptyDirOutputTag = new 
OutputTag<Path>("empty-dir-output") {};
+        SingleOutputStreamOperator<Tuple2<String, Long>> candidates =
                 env.fromCollection(Collections.singletonList(1), 
TypeInformation.of(Integer.class))
                         .process(
-                                new ProcessFunction<Integer, Tuple2<String, 
Long>>() {
+                                new ProcessFunction<Integer, String>() {
                                     @Override
                                     public void processElement(
                                             Integer i,
-                                            ProcessFunction<Integer, 
Tuple2<String, Long>>.Context
+                                            ProcessFunction<Integer, 
String>.Context ctx,
+                                            Collector<String> out) {
+                                        FileStorePathFactory pathFactory =
+                                                table.store().pathFactory();
+                                        listPaimonFileDirs(
+                                                        table.fullName(),
+                                                        
pathFactory.manifestPath().toString(),
+                                                        
pathFactory.indexPath().toString(),
+                                                        
pathFactory.statisticsPath().toString(),
+                                                        
pathFactory.dataFilePath().toString(),
+                                                        partitionKeysNum,
+                                                        
table.coreOptions().dataFileExternalPaths())
+                                                .stream()
+                                                .map(Path::toUri)
+                                                .map(Object::toString)
+                                                .forEach(out::collect);
+                                    }
+                                })
+                        .name("list-dirs")
+                        .forceNonParallel()
+                        .process(
+                                new ProcessFunction<String, Tuple2<String, 
Long>>() {
+                                    @Override
+                                    public void processElement(
+                                            String dir,
+                                            ProcessFunction<String, 
Tuple2<String, Long>>.Context
                                                     ctx,
                                             Collector<Tuple2<String, Long>> 
out) {
-                                        listPaimonFilesForTable(out);
+                                        Path dirPath = new Path(dir);
+                                        List<FileStatus> files = 
tryBestListingDirs(dirPath);
+                                        for (FileStatus file : files) {
+                                            if (oldEnough(file)) {
+                                                out.collect(
+                                                        Tuple2.of(
+                                                                
file.getPath().toUri().toString(),
+                                                                
file.getLen()));
+                                            }
+                                        }
+                                        if (files.isEmpty()) {
+                                            ctx.output(emptyDirOutputTag, 
dirPath);
+                                        }
                                     }
                                 })
-                        .setParallelism(1);
+                        .name("collect-candidate-files");
+
+        candidates
+                .getSideOutput(emptyDirOutputTag)
+                .transform(
+                        "clean-empty-dirs",
+                        STRING_TYPE_INFO,
+                        new BoundedOneInputOperator<Path, String>() {
+
+                            private Set<Path> emptyDirs = new HashSet<>();
+
+                            @Override
+                            public void processElement(StreamRecord<Path> 
element) {
+                                emptyDirs.add(element.getValue());
+                            }
+
+                            @Override
+                            public void endInput() throws IOException {
+                                // delete empty dir
+                                while (!emptyDirs.isEmpty()) {
+                                    Set<Path> newEmptyDir = new HashSet<>();
+                                    for (Path emptyDir : emptyDirs) {
+                                        try {
+                                            if (fileIO.delete(emptyDir, 
false)) {
+                                                LOG.info("Clean empty dir: 
{}", emptyDir);
+                                                output.collect(
+                                                        new 
StreamRecord<>(emptyDir.toString()));
+                                                // recursive cleaning
+                                                
newEmptyDir.add(emptyDir.getParent());
+                                            }
+                                        } catch (IOException ignored) {
+                                            LOG.warn("Clean empty dir failed: 
{}", emptyDir);
+                                        }
+                                    }
+                                    emptyDirs = newEmptyDir;
+                                }
+                            }
+                        })
+                .forceNonParallel()
+                .sinkTo(new DiscardingSink<>())
+                .name("end")
+                .setParallelism(1)
+                .setMaxParallelism(1);
 
         DataStream<CleanOrphanFilesResult> deleted =
                 usedFiles
@@ -256,7 +344,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                         .connect(
                                 candidates.keyBy(pathAndSize -> new 
Path(pathAndSize.f0).getName()))
                         .transform(
-                                "files_join",
+                                "join-used-and-candidate-files",
                                 
TypeInformation.of(CleanOrphanFilesResult.class),
                                 new BoundedTwoInputOperator<
                                         String, Tuple2<String, Long>, 
CleanOrphanFilesResult>() {
@@ -323,50 +411,6 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
         return deleted;
     }
 
-    private void listPaimonFilesForTable(Collector<Tuple2<String, Long>> out) {
-        FileStorePathFactory pathFactory = table.store().pathFactory();
-        List<String> dirs =
-                listPaimonFileDirs(
-                                table.fullName(),
-                                pathFactory.manifestPath().toString(),
-                                pathFactory.indexPath().toString(),
-                                pathFactory.statisticsPath().toString(),
-                                pathFactory.dataFilePath().toString(),
-                                partitionKeysNum,
-                                table.coreOptions().dataFileExternalPaths())
-                        .stream()
-                        .map(Path::toUri)
-                        .map(Object::toString)
-                        .collect(Collectors.toList());
-        Set<Path> emptyDirs = new HashSet<>();
-        for (String dir : dirs) {
-            Path dirPath = new Path(dir);
-            List<FileStatus> files = tryBestListingDirs(dirPath);
-            for (FileStatus file : files) {
-                if (oldEnough(file)) {
-                    out.collect(new 
Tuple2<>(file.getPath().toUri().toString(), file.getLen()));
-                }
-            }
-            if (files.isEmpty()) {
-                emptyDirs.add(dirPath);
-            }
-        }
-
-        // delete empty dir
-        while (!emptyDirs.isEmpty()) {
-            Set<Path> newEmptyDir = new HashSet<>();
-            for (Path emptyDir : emptyDirs) {
-                try {
-                    fileIO.delete(emptyDir, false);
-                    // recursive cleaning
-                    newEmptyDir.add(emptyDir.getParent());
-                } catch (IOException ignored) {
-                }
-            }
-            emptyDirs = newEmptyDir;
-        }
-    }
-
     public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
             StreamExecutionEnvironment env,
             Catalog catalog,

Reply via email to