This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d57d3d90a [core] add deletedFileTotalSizeInBytes in result of 
OrphanFilesClean (#4545)
8d57d3d90a is described below

commit 8d57d3d90a945c8705d3acda3e7bf0ef2cb157ef
Author: wangwj <[email protected]>
AuthorDate: Tue Nov 26 13:02:01 2024 +0800

    [core] add deletedFileTotalSizeInBytes in result of OrphanFilesClean (#4545)
---
 .../paimon/operation/CleanOrphanFilesResult.java   | 54 ++++++++++++
 .../paimon/operation/LocalOrphanFilesClean.java    | 64 +++++++++-----
 .../apache/paimon/operation/OrphanFilesClean.java  | 38 +++++++--
 .../org/apache/paimon/utils/SnapshotManager.java   | 10 +--
 .../operation/LocalOrphanFilesCleanTest.java       | 18 ++--
 .../procedure/RemoveOrphanFilesProcedure.java      | 13 ++-
 .../flink/RemoveOrphanFilesActionITCase.java       |  2 +-
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java | 98 +++++++++++++++-------
 .../procedure/RemoveOrphanFilesProcedure.java      | 12 ++-
 .../action/RemoveOrphanFilesActionITCaseBase.java  |  2 +-
 .../procedure/RemoveOrphanFilesProcedure.java      | 18 ++--
 .../spark/orphan/SparkOrphanFilesClean.scala       | 64 ++++++++------
 .../procedure/RemoveOrphanFilesProcedureTest.scala | 30 +++----
 13 files changed, 291 insertions(+), 132 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java
new file mode 100644
index 0000000000..5a3bc67f9c
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/CleanOrphanFilesResult.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.fs.Path;
+
+import java.util.List;
+
+/** The result of OrphanFilesClean. */
+public class CleanOrphanFilesResult {
+
+    private List<Path> deletedFilesPath;
+    private final long deletedFileCount;
+    private final long deletedFileTotalLenInBytes;
+
+    public CleanOrphanFilesResult(long deletedFileCount, long 
deletedFileTotalLenInBytes) {
+        this.deletedFileCount = deletedFileCount;
+        this.deletedFileTotalLenInBytes = deletedFileTotalLenInBytes;
+    }
+
+    public CleanOrphanFilesResult(
+            List<Path> deletedFilesPath, long deletedFileCount, long 
deletedFileTotalLenInBytes) {
+        this(deletedFileCount, deletedFileTotalLenInBytes);
+        this.deletedFilesPath = deletedFilesPath;
+    }
+
+    public long getDeletedFileCount() {
+        return deletedFileCount;
+    }
+
+    public long getDeletedFileTotalLenInBytes() {
+        return deletedFileTotalLenInBytes;
+    }
+
+    public List<Path> getDeletedFilesPath() {
+        return deletedFilesPath;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index a5eea6d650..511c5fc7fb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -21,12 +21,12 @@ package org.apache.paimon.operation;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SerializableConsumer;
 
 import javax.annotation.Nullable;
@@ -47,6 +47,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -68,6 +69,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean {
 
     private final List<Path> deleteFiles;
 
+    private final AtomicLong deletedFilesLenInBytes = new AtomicLong(0);
+
     private Set<String> candidateDeletes;
 
     public LocalOrphanFilesClean(FileStoreTable table) {
@@ -87,16 +90,18 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
                         table.coreOptions().deleteFileThreadNum(), 
"ORPHAN_FILES_CLEAN");
     }
 
-    public List<Path> clean() throws IOException, ExecutionException, 
InterruptedException {
+    public CleanOrphanFilesResult clean()
+            throws IOException, ExecutionException, InterruptedException {
         List<String> branches = validBranches();
 
         // specially handle to clear snapshot dir
-        cleanSnapshotDir(branches, deleteFiles::add);
+        cleanSnapshotDir(branches, deleteFiles::add, 
deletedFilesLenInBytes::addAndGet);
 
         // delete candidate files
-        Map<String, Path> candidates = getCandidateDeletingFiles();
+        Map<String, Pair<Path, Long>> candidates = getCandidateDeletingFiles();
         if (candidates.isEmpty()) {
-            return deleteFiles;
+            return new CleanOrphanFilesResult(
+                    deleteFiles, deleteFiles.size(), 
deletedFilesLenInBytes.get());
         }
         candidateDeletes = new HashSet<>(candidates.keySet());
 
@@ -108,12 +113,22 @@ public class LocalOrphanFilesClean extends 
OrphanFilesClean {
 
         // delete unused files
         candidateDeletes.removeAll(usedFiles);
-        candidateDeletes.stream().map(candidates::get).forEach(fileCleaner);
+        candidateDeletes.stream()
+                .map(candidates::get)
+                .forEach(
+                        deleteFileInfo -> {
+                            
deletedFilesLenInBytes.addAndGet(deleteFileInfo.getRight());
+                            fileCleaner.accept(deleteFileInfo.getLeft());
+                        });
         deleteFiles.addAll(
-                
candidateDeletes.stream().map(candidates::get).collect(Collectors.toList()));
+                candidateDeletes.stream()
+                        .map(candidates::get)
+                        .map(Pair::getLeft)
+                        .collect(Collectors.toList()));
         candidateDeletes.clear();
 
-        return deleteFiles;
+        return new CleanOrphanFilesResult(
+                deleteFiles, deleteFiles.size(), deletedFilesLenInBytes.get());
     }
 
     private void collectWithoutDataFile(
@@ -172,19 +187,20 @@ public class LocalOrphanFilesClean extends 
OrphanFilesClean {
      * Get all the candidate deleting files in the specified directories and 
filter them by
      * olderThanMillis.
      */
-    private Map<String, Path> getCandidateDeletingFiles() {
+    private Map<String, Pair<Path, Long>> getCandidateDeletingFiles() {
         List<Path> fileDirs = listPaimonFileDirs();
-        Function<Path, List<Path>> processor =
+        Function<Path, List<Pair<Path, Long>>> processor =
                 path ->
                         tryBestListingDirs(path).stream()
                                 .filter(this::oldEnough)
-                                .map(FileStatus::getPath)
+                                .map(status -> Pair.of(status.getPath(), 
status.getLen()))
                                 .collect(Collectors.toList());
-        Iterator<Path> allPaths = randomlyExecuteSequentialReturn(executor, 
processor, fileDirs);
-        Map<String, Path> result = new HashMap<>();
-        while (allPaths.hasNext()) {
-            Path next = allPaths.next();
-            result.put(next.getName(), next);
+        Iterator<Pair<Path, Long>> allFilesInfo =
+                randomlyExecuteSequentialReturn(executor, processor, fileDirs);
+        Map<String, Pair<Path, Long>> result = new HashMap<>();
+        while (allFilesInfo.hasNext()) {
+            Pair<Path, Long> fileInfo = allFilesInfo.next();
+            result.put(fileInfo.getLeft().getName(), fileInfo);
         }
         return result;
     }
@@ -197,7 +213,6 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
             SerializableConsumer<Path> fileCleaner,
             @Nullable Integer parallelism)
             throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
-        List<LocalOrphanFilesClean> orphanFilesCleans = new ArrayList<>();
         List<String> tableNames = Collections.singletonList(tableName);
         if (tableName == null || "*".equals(tableName)) {
             tableNames = catalog.listTables(databaseName);
@@ -214,6 +229,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
                             }
                         };
 
+        List<LocalOrphanFilesClean> orphanFilesCleans = new 
ArrayList<>(tableNames.size());
         for (String t : tableNames) {
             Identifier identifier = new Identifier(databaseName, t);
             Table table = catalog.getTable(identifier).copy(dynamicOptions);
@@ -230,7 +246,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
         return orphanFilesCleans;
     }
 
-    public static long executeDatabaseOrphanFiles(
+    public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
             Catalog catalog,
             String databaseName,
             @Nullable String tableName,
@@ -249,15 +265,17 @@ public class LocalOrphanFilesClean extends 
OrphanFilesClean {
 
         ExecutorService executorService =
                 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
-        List<Future<List<Path>>> tasks = new ArrayList<>();
+        List<Future<CleanOrphanFilesResult>> tasks = new 
ArrayList<>(tableCleans.size());
         for (LocalOrphanFilesClean clean : tableCleans) {
             tasks.add(executorService.submit(clean::clean));
         }
 
-        List<Path> cleanOrphanFiles = new ArrayList<>();
-        for (Future<List<Path>> task : tasks) {
+        long deletedFileCount = 0;
+        long deletedFileTotalLenInBytes = 0;
+        for (Future<CleanOrphanFilesResult> task : tasks) {
             try {
-                cleanOrphanFiles.addAll(task.get());
+                deletedFileCount += task.get().getDeletedFileCount();
+                deletedFileTotalLenInBytes += 
task.get().getDeletedFileTotalLenInBytes();
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new RuntimeException(e);
@@ -267,6 +285,6 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
         }
 
         executorService.shutdownNow();
-        return cleanOrphanFiles.size();
+        return new CleanOrphanFilesResult(deletedFileCount, 
deletedFileTotalLenInBytes);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 869100d9cf..274cdd52fe 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -120,23 +120,47 @@ public abstract class OrphanFilesClean implements 
Serializable {
         return branches;
     }
 
-    protected void cleanSnapshotDir(List<String> branches, Consumer<Path> 
deletedFileConsumer) {
+    protected void cleanSnapshotDir(
+            List<String> branches,
+            Consumer<Path> deletedFilesConsumer,
+            Consumer<Long> deletedFilesLenInBytesConsumer) {
         for (String branch : branches) {
             FileStoreTable branchTable = table.switchToBranch(branch);
             SnapshotManager snapshotManager = branchTable.snapshotManager();
 
             // specially handle the snapshot directory
-            List<Path> nonSnapshotFiles = 
snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
-            nonSnapshotFiles.forEach(fileCleaner);
-            nonSnapshotFiles.forEach(deletedFileConsumer);
+            List<Pair<Path, Long>> nonSnapshotFiles =
+                    snapshotManager.tryGetNonSnapshotFiles(this::oldEnough);
+            nonSnapshotFiles.forEach(
+                    nonSnapshotFile ->
+                            cleanFile(
+                                    nonSnapshotFile,
+                                    deletedFilesConsumer,
+                                    deletedFilesLenInBytesConsumer));
 
             // specially handle the changelog directory
-            List<Path> nonChangelogFiles = 
snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
-            nonChangelogFiles.forEach(fileCleaner);
-            nonChangelogFiles.forEach(deletedFileConsumer);
+            List<Pair<Path, Long>> nonChangelogFiles =
+                    snapshotManager.tryGetNonChangelogFiles(this::oldEnough);
+            nonChangelogFiles.forEach(
+                    nonChangelogFile ->
+                            cleanFile(
+                                    nonChangelogFile,
+                                    deletedFilesConsumer,
+                                    deletedFilesLenInBytesConsumer));
         }
     }
 
+    private void cleanFile(
+            Pair<Path, Long> deleteFileInfo,
+            Consumer<Path> deletedFilesConsumer,
+            Consumer<Long> deletedFilesLenInBytesConsumer) {
+        Path filePath = deleteFileInfo.getLeft();
+        Long fileSize = deleteFileInfo.getRight();
+        deletedFilesConsumer.accept(filePath);
+        deletedFilesLenInBytesConsumer.accept(fileSize);
+        fileCleaner.accept(filePath);
+    }
+
     protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws 
IOException {
         FileStoreTable branchTable = table.switchToBranch(branch);
         SnapshotManager snapshotManager = branchTable.snapshotManager();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index 9a120042ea..cbe33ffaf4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -563,15 +563,15 @@ public class SnapshotManager implements Serializable {
      * Try to get non snapshot files. If any error occurred, just ignore it 
and return an empty
      * result.
      */
-    public List<Path> tryGetNonSnapshotFiles(Predicate<FileStatus> 
fileStatusFilter) {
+    public List<Pair<Path, Long>> tryGetNonSnapshotFiles(Predicate<FileStatus> 
fileStatusFilter) {
         return listPathWithFilter(snapshotDirectory(), fileStatusFilter, 
nonSnapshotFileFilter());
     }
 
-    public List<Path> tryGetNonChangelogFiles(Predicate<FileStatus> 
fileStatusFilter) {
+    public List<Pair<Path, Long>> 
tryGetNonChangelogFiles(Predicate<FileStatus> fileStatusFilter) {
         return listPathWithFilter(changelogDirectory(), fileStatusFilter, 
nonChangelogFileFilter());
     }
 
-    private List<Path> listPathWithFilter(
+    private List<Pair<Path, Long>> listPathWithFilter(
             Path directory, Predicate<FileStatus> fileStatusFilter, 
Predicate<Path> fileFilter) {
         try {
             FileStatus[] statuses = fileIO.listStatus(directory);
@@ -581,8 +581,8 @@ public class SnapshotManager implements Serializable {
 
             return Arrays.stream(statuses)
                     .filter(fileStatusFilter)
-                    .map(FileStatus::getPath)
-                    .filter(fileFilter)
+                    .filter(status -> fileFilter.test(status.getPath()))
+                    .map(status -> Pair.of(status.getPath(), status.getLen()))
                     .collect(Collectors.toList());
         } catch (IOException ignored) {
             return Collections.emptyList();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
index fdc68b34ab..5139dd4495 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java
@@ -165,22 +165,20 @@ public class LocalOrphanFilesCleanTest {
 
         // randomly delete tags
         List<String> deleteTags = Collections.emptyList();
-        if (!allTags.isEmpty()) {
-            deleteTags = randomlyPick(allTags);
-            for (String tagName : deleteTags) {
-                table.deleteTag(tagName);
-            }
+        deleteTags = randomlyPick(allTags);
+        for (String tagName : deleteTags) {
+            table.deleteTag(tagName);
         }
 
         // first check, nothing will be deleted because the default olderThan 
interval is 1 day
         LocalOrphanFilesClean orphanFilesClean = new 
LocalOrphanFilesClean(table);
-        assertThat(orphanFilesClean.clean().size()).isEqualTo(0);
+        
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0);
 
         // second check
         orphanFilesClean =
                 new LocalOrphanFilesClean(
                         table, System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(2));
-        List<Path> deleted = orphanFilesClean.clean();
+        List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
         try {
             validate(deleted, snapshotData, new HashMap<>());
         } catch (Throwable t) {
@@ -363,13 +361,13 @@ public class LocalOrphanFilesCleanTest {
 
         // first check, nothing will be deleted because the default olderThan 
interval is 1 day
         LocalOrphanFilesClean orphanFilesClean = new 
LocalOrphanFilesClean(table);
-        assertThat(orphanFilesClean.clean().size()).isEqualTo(0);
+        
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0);
 
         // second check
         orphanFilesClean =
                 new LocalOrphanFilesClean(
                         table, System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(2));
-        List<Path> deleted = orphanFilesClean.clean();
+        List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
         validate(deleted, snapshotData, changelogData);
     }
 
@@ -399,7 +397,7 @@ public class LocalOrphanFilesCleanTest {
         LocalOrphanFilesClean orphanFilesClean =
                 new LocalOrphanFilesClean(
                         table, System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(2));
-        assertThat(orphanFilesClean.clean().size()).isGreaterThan(0);
+        
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isGreaterThan(0);
     }
 
     private void writeData(
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 7695c510b1..b4a3a6b359 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
 import org.apache.paimon.operation.LocalOrphanFilesClean;
 
 import org.apache.flink.table.procedure.ProcedureContext;
@@ -86,11 +87,12 @@ public class RemoveOrphanFilesProcedure extends 
ProcedureBase {
         if (mode == null) {
             mode = "DISTRIBUTED";
         }
-        long deletedFiles;
+
+        CleanOrphanFilesResult cleanOrphanFilesResult;
         try {
             switch (mode.toUpperCase(Locale.ROOT)) {
                 case "DISTRIBUTED":
-                    deletedFiles =
+                    cleanOrphanFilesResult =
                             FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
                                     procedureContext.getExecutionEnvironment(),
                                     catalog,
@@ -101,7 +103,7 @@ public class RemoveOrphanFilesProcedure extends 
ProcedureBase {
                                     tableName);
                     break;
                 case "LOCAL":
-                    deletedFiles =
+                    cleanOrphanFilesResult =
                             LocalOrphanFilesClean.executeDatabaseOrphanFiles(
                                     catalog,
                                     databaseName,
@@ -116,7 +118,10 @@ public class RemoveOrphanFilesProcedure extends 
ProcedureBase {
                                     + mode
                                     + ". Only 'DISTRIBUTED' and 'LOCAL' are 
supported.");
             }
-            return new String[] {String.valueOf(deletedFiles)};
+            return new String[] {
+                String.valueOf(cleanOrphanFilesResult.getDeletedFileCount()),
+                
String.valueOf(cleanOrphanFilesResult.getDeletedFileTotalLenInBytes())
+            };
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
index 46b62b6bf3..a168c3785c 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
@@ -137,7 +137,7 @@ public class RemoveOrphanFilesActionITCase extends 
ActionITCaseBase {
                         database, tableName);
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
 
-        assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"));
+        assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), 
Row.of("2"));
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index 61bebca24a..23bbbc9b60 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -27,12 +27,15 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
 import org.apache.paimon.operation.OrphanFilesClean;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SerializableConsumer;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -61,7 +64,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.LONG_TYPE_INFO;
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static org.apache.flink.util.Preconditions.checkState;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -81,7 +83,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean {
     }
 
     @Nullable
-    public DataStream<Long> doOrphanClean(StreamExecutionEnvironment env) {
+    public DataStream<CleanOrphanFilesResult> 
doOrphanClean(StreamExecutionEnvironment env) {
         Configuration flinkConf = new Configuration();
         flinkConf.set(ExecutionOptions.RUNTIME_MODE, 
RuntimeExecutionMode.BATCH);
         flinkConf.set(ExecutionOptions.SORT_INPUTS, false);
@@ -97,8 +99,12 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean {
 
         // snapshot and changelog files are the root of everything, so they 
are handled specially
         // here, and subsequently, we will not count their orphan files.
-        AtomicLong deletedInLocal = new AtomicLong(0);
-        cleanSnapshotDir(branches, p -> deletedInLocal.incrementAndGet());
+        AtomicLong deletedFilesCountInLocal = new AtomicLong(0);
+        AtomicLong deletedFilesLenInBytesInLocal = new AtomicLong(0);
+        cleanSnapshotDir(
+                branches,
+                path -> deletedFilesCountInLocal.incrementAndGet(),
+                deletedFilesLenInBytesInLocal::addAndGet);
 
         // branch and manifest file
         final OutputTag<Tuple2<String, String>> manifestOutputTag =
@@ -203,36 +209,45 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                         .map(Path::toUri)
                         .map(Object::toString)
                         .collect(Collectors.toList());
-        DataStream<String> candidates =
+        DataStream<Pair<String, Long>> candidates =
                 env.fromCollection(fileDirs)
                         .process(
-                                new ProcessFunction<String, String>() {
+                                new ProcessFunction<String, Pair<String, 
Long>>() {
                                     @Override
                                     public void processElement(
                                             String dir,
-                                            ProcessFunction<String, 
String>.Context ctx,
-                                            Collector<String> out) {
+                                            ProcessFunction<String, 
Pair<String, Long>>.Context ctx,
+                                            Collector<Pair<String, Long>> out) 
{
                                         for (FileStatus fileStatus :
                                                 tryBestListingDirs(new 
Path(dir))) {
                                             if (oldEnough(fileStatus)) {
                                                 out.collect(
-                                                        
fileStatus.getPath().toUri().toString());
+                                                        Pair.of(
+                                                                fileStatus
+                                                                        
.getPath()
+                                                                        
.toUri()
+                                                                        
.toString(),
+                                                                
fileStatus.getLen()));
                                             }
                                         }
                                     }
                                 });
 
-        DataStream<Long> deleted =
+        DataStream<CleanOrphanFilesResult> deleted =
                 usedFiles
                         .keyBy(f -> f)
-                        .connect(candidates.keyBy(path -> new 
Path(path).getName()))
+                        .connect(
+                                candidates.keyBy(
+                                        pathAndSize -> new 
Path(pathAndSize.getKey()).getName()))
                         .transform(
                                 "files_join",
-                                LONG_TYPE_INFO,
-                                new BoundedTwoInputOperator<String, String, 
Long>() {
+                                
TypeInformation.of(CleanOrphanFilesResult.class),
+                                new BoundedTwoInputOperator<
+                                        String, Pair<String, Long>, 
CleanOrphanFilesResult>() {
 
                                     private boolean buildEnd;
-                                    private long emitted;
+                                    private long emittedFilesCount;
+                                    private long emittedFilesLen;
 
                                     private final Set<String> used = new 
HashSet<>();
 
@@ -254,8 +269,15 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                                             case 2:
                                                 checkState(buildEnd, "Should 
build ended.");
                                                 LOG.info("Finish probe 
phase.");
-                                                LOG.info("Clean files: {}", 
emitted);
-                                                output.collect(new 
StreamRecord<>(emitted));
+                                                LOG.info(
+                                                        "Clean files count : 
{}",
+                                                        emittedFilesCount);
+                                                LOG.info("Clean files size : 
{}", emittedFilesLen);
+                                                output.collect(
+                                                        new StreamRecord<>(
+                                                                new 
CleanOrphanFilesResult(
+                                                                        
emittedFilesCount,
+                                                                        
emittedFilesLen)));
                                                 break;
                                         }
                                     }
@@ -266,25 +288,34 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                                     }
 
                                     @Override
-                                    public void 
processElement2(StreamRecord<String> element) {
+                                    public void processElement2(
+                                            StreamRecord<Pair<String, Long>> 
element) {
                                         checkState(buildEnd, "Should build 
ended.");
-                                        String value = element.getValue();
+                                        Pair<String, Long> fileInfo = 
element.getValue();
+                                        String value = fileInfo.getLeft();
                                         Path path = new Path(value);
                                         if (!used.contains(path.getName())) {
+                                            emittedFilesCount++;
+                                            emittedFilesLen += 
fileInfo.getRight();
                                             fileCleaner.accept(path);
                                             LOG.info("Dry clean: {}", path);
-                                            emitted++;
                                         }
                                     }
                                 });
 
-        if (deletedInLocal.get() != 0) {
-            deleted = deleted.union(env.fromElements(deletedInLocal.get()));
+        if (deletedFilesCountInLocal.get() != 0 || 
deletedFilesLenInBytesInLocal.get() != 0) {
+            deleted =
+                    deleted.union(
+                            env.fromElements(
+                                    new CleanOrphanFilesResult(
+                                            deletedFilesCountInLocal.get(),
+                                            
deletedFilesLenInBytesInLocal.get())));
         }
+
         return deleted;
     }
 
-    public static long executeDatabaseOrphanFiles(
+    public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
             StreamExecutionEnvironment env,
             Catalog catalog,
             long olderThanMillis,
@@ -293,12 +324,13 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
             String databaseName,
             @Nullable String tableName)
             throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
-        List<DataStream<Long>> orphanFilesCleans = new ArrayList<>();
         List<String> tableNames = Collections.singletonList(tableName);
         if (tableName == null || "*".equals(tableName)) {
             tableNames = catalog.listTables(databaseName);
         }
 
+        List<DataStream<CleanOrphanFilesResult>> orphanFilesCleans =
+                new ArrayList<>(tableNames.size());
         for (String t : tableNames) {
             Identifier identifier = new Identifier(databaseName, t);
             Table table = catalog.getTable(identifier);
@@ -307,7 +339,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                     "Only FileStoreTable supports remove-orphan-files action. 
The table type is '%s'.",
                     table.getClass().getName());
 
-            DataStream<Long> clean =
+            DataStream<CleanOrphanFilesResult> clean =
                     new FlinkOrphanFilesClean(
                                     (FileStoreTable) table,
                                     olderThanMillis,
@@ -319,8 +351,8 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
             }
         }
 
-        DataStream<Long> result = null;
-        for (DataStream<Long> clean : orphanFilesCleans) {
+        DataStream<CleanOrphanFilesResult> result = null;
+        for (DataStream<CleanOrphanFilesResult> clean : orphanFilesCleans) {
             if (result == null) {
                 result = clean;
             } else {
@@ -331,20 +363,24 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
         return sum(result);
     }
 
-    private static long sum(DataStream<Long> deleted) {
-        long deleteCount = 0;
+    private static CleanOrphanFilesResult 
sum(DataStream<CleanOrphanFilesResult> deleted) {
+        long deletedFilesCount = 0;
+        long deletedFilesLenInBytes = 0;
         if (deleted != null) {
             try {
-                CloseableIterator<Long> iterator =
+                CloseableIterator<CleanOrphanFilesResult> iterator =
                         deleted.global().executeAndCollect("OrphanFilesClean");
                 while (iterator.hasNext()) {
-                    deleteCount += iterator.next();
+                    CleanOrphanFilesResult cleanOrphanFilesResult = 
iterator.next();
+                    deletedFilesCount += 
cleanOrphanFilesResult.getDeletedFileCount();
+                    deletedFilesLenInBytes +=
+                            
cleanOrphanFilesResult.getDeletedFileTotalLenInBytes();
                 }
                 iterator.close();
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
         }
-        return deleteCount;
+        return new CleanOrphanFilesResult(deletedFilesCount, 
deletedFilesLenInBytes);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 10ad878e0c..4cd1b3e003 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
 import org.apache.paimon.operation.LocalOrphanFilesClean;
 
 import org.apache.flink.table.annotation.ArgumentHint;
@@ -75,11 +76,11 @@ public class RemoveOrphanFilesProcedure extends 
ProcedureBase {
         if (mode == null) {
             mode = "DISTRIBUTED";
         }
-        long deletedFiles;
+        CleanOrphanFilesResult cleanOrphanFilesResult;
         try {
             switch (mode.toUpperCase(Locale.ROOT)) {
                 case "DISTRIBUTED":
-                    deletedFiles =
+                    cleanOrphanFilesResult =
                             FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
                                     procedureContext.getExecutionEnvironment(),
                                     catalog,
@@ -90,7 +91,7 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase 
{
                                     tableName);
                     break;
                 case "LOCAL":
-                    deletedFiles =
+                    cleanOrphanFilesResult =
                             LocalOrphanFilesClean.executeDatabaseOrphanFiles(
                                     catalog,
                                     databaseName,
@@ -105,7 +106,10 @@ public class RemoveOrphanFilesProcedure extends 
ProcedureBase {
                                     + mode
                                     + ". Only 'DISTRIBUTED' and 'LOCAL' are 
supported.");
             }
-            return new String[] {String.valueOf(deletedFiles)};
+            return new String[] {
+                String.valueOf(cleanOrphanFilesResult.getDeletedFileCount()),
+                
String.valueOf(cleanOrphanFilesResult.getDeletedFileTotalLenInBytes())
+            };
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
index 5f874a5a7f..77f3be2f0c 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
@@ -148,7 +148,7 @@ public abstract class RemoveOrphanFilesActionITCaseBase 
extends ActionITCaseBase
                         tableName);
         ImmutableList<Row> actualDeleteFile = 
ImmutableList.copyOf(executeSQL(withOlderThan));
 
-        assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"));
+        assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), 
Row.of("2"));
     }
 
     @ParameterizedTest
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index 293e84ca14..a929641106 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.procedure;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.operation.CleanOrphanFilesResult;
 import org.apache.paimon.operation.LocalOrphanFilesClean;
 import org.apache.paimon.operation.OrphanFilesClean;
 import org.apache.paimon.spark.catalog.WithPaimonCatalog;
@@ -66,7 +67,9 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure 
{
     private static final StructType OUTPUT_TYPE =
             new StructType(
                     new StructField[] {
-                        new StructField("result", LongType, true, 
Metadata.empty())
+                        new StructField("deletedFileCount", LongType, true, 
Metadata.empty()),
+                        new StructField(
+                                "deletedFileTotalLenInBytes", LongType, true, 
Metadata.empty())
                     });
 
     private RemoveOrphanFilesProcedure(TableCatalog tableCatalog) {
@@ -104,11 +107,11 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
         Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
         String mode = args.isNullAt(4) ? "DISTRIBUTED" : args.getString(4);
 
-        long deletedFiles;
+        CleanOrphanFilesResult cleanOrphanFilesResult;
         try {
             switch (mode.toUpperCase(Locale.ROOT)) {
                 case "LOCAL":
-                    deletedFiles =
+                    cleanOrphanFilesResult =
                             LocalOrphanFilesClean.executeDatabaseOrphanFiles(
                                     catalog,
                                     identifier.getDatabaseName(),
@@ -120,7 +123,7 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
                                     args.isNullAt(3) ? null : args.getInt(3));
                     break;
                 case "DISTRIBUTED":
-                    deletedFiles =
+                    cleanOrphanFilesResult =
                             SparkOrphanFilesClean.executeDatabaseOrphanFiles(
                                     catalog,
                                     identifier.getDatabaseName(),
@@ -137,7 +140,12 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
                                     + mode
                                     + ". Only 'DISTRIBUTED' and 'LOCAL' are 
supported.");
             }
-            return new InternalRow[] {newInternalRow(deletedFiles)};
+
+            return new InternalRow[] {
+                newInternalRow(
+                        cleanOrphanFilesResult.getDeletedFileCount(),
+                        cleanOrphanFilesResult.getDeletedFileTotalLenInBytes())
+            };
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
index 488d70e349..fca0493ede 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
@@ -22,15 +22,14 @@ import org.apache.paimon.{utils, Snapshot}
 import org.apache.paimon.catalog.{Catalog, Identifier}
 import org.apache.paimon.fs.Path
 import org.apache.paimon.manifest.{ManifestEntry, ManifestFile}
-import org.apache.paimon.operation.OrphanFilesClean
+import org.apache.paimon.operation.{CleanOrphanFilesResult, OrphanFilesClean}
 import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.utils.SerializableConsumer
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.{functions, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.functions.sum
 
 import java.util
 import java.util.Collections
@@ -50,14 +49,18 @@ case class SparkOrphanFilesClean(
   with SQLConfHelper
   with Logging {
 
-  def doOrphanClean(): (Dataset[Long], Dataset[BranchAndManifestFile]) = {
+  def doOrphanClean(): (Dataset[(Long, Long)], Dataset[BranchAndManifestFile]) 
= {
     import spark.implicits._
 
     val branches = validBranches()
-    val deletedInLocal = new AtomicLong(0)
+    val deletedFilesCountInLocal = new AtomicLong(0)
+    val deletedFilesLenInBytesInLocal = new AtomicLong(0)
     // snapshot and changelog files are the root of everything, so they are 
handled specially
     // here, and subsequently, we will not count their orphan files.
-    cleanSnapshotDir(branches, (_: Path) => deletedInLocal.incrementAndGet)
+    cleanSnapshotDir(
+      branches,
+      (_: Path) => deletedFilesCountInLocal.incrementAndGet,
+      size => deletedFilesLenInBytesInLocal.addAndGet(size))
 
     val maxBranchParallelism = Math.min(branches.size(), parallelism)
     // find snapshots using branch and find manifests(manifest, index, 
statistics) using snapshot
@@ -121,10 +124,10 @@ case class SparkOrphanFilesClean(
       .flatMap {
         dir =>
           tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
-            file => (file.getPath.getName, file.getPath.toUri.toString)
+            file => (file.getPath.getName, file.getPath.toUri.toString, 
file.getLen)
           }
       }
-      .toDF("name", "path")
+      .toDF("name", "path", "len")
       .repartition(parallelism)
 
     // use left anti to filter files which is not used
@@ -132,21 +135,30 @@ case class SparkOrphanFilesClean(
       .join(usedFiles, $"name" === $"used_name", "left_anti")
       .mapPartitions {
         it =>
-          var deleted = 0L
+          var deletedFilesCount = 0L
+          var deletedFilesLenInBytes = 0L
+
           while (it.hasNext) {
-            val pathToClean = it.next().getString(1)
-            specifiedFileCleaner.accept(new Path(pathToClean))
+            val fileInfo = it.next();
+            val pathToClean = fileInfo.getString(1)
+            val deletedPath = new Path(pathToClean)
+            deletedFilesLenInBytes += fileInfo.getLong(2)
+            specifiedFileCleaner.accept(deletedPath)
             logInfo(s"Cleaned file: $pathToClean")
-            deleted += 1
+            deletedFilesCount += 1
           }
-          logInfo(s"Total cleaned files: $deleted");
-          Iterator.single(deleted)
+          logInfo(
+            s"Total cleaned files: $deletedFilesCount, Total cleaned files len 
: $deletedFilesLenInBytes")
+          Iterator.single((deletedFilesCount, deletedFilesLenInBytes))
+      }
+    val finalDeletedDataset =
+      if (deletedFilesCountInLocal.get() != 0 || 
deletedFilesLenInBytesInLocal.get() != 0) {
+        deleted.union(
+          spark.createDataset(
+            Seq((deletedFilesCountInLocal.get(), 
deletedFilesLenInBytesInLocal.get()))))
+      } else {
+        deleted
       }
-    val finalDeletedDataset = if (deletedInLocal.get() != 0) {
-      deleted.union(spark.createDataset(Seq(deletedInLocal.get())))
-    } else {
-      deleted
-    }
 
     (finalDeletedDataset, usedManifestFiles)
   }
@@ -169,7 +181,7 @@ object SparkOrphanFilesClean extends SQLConfHelper {
       tableName: String,
       olderThanMillis: Long,
       fileCleaner: SerializableConsumer[Path],
-      parallelismOpt: Integer): Long = {
+      parallelismOpt: Integer): CleanOrphanFilesResult = {
     val spark = SparkSession.active
     val parallelism = if (parallelismOpt == null) {
       Math.max(spark.sparkContext.defaultParallelism, 
conf.numShufflePartitions)
@@ -192,7 +204,7 @@ object SparkOrphanFilesClean extends SQLConfHelper {
         table.asInstanceOf[FileStoreTable]
     }
     if (tables.isEmpty) {
-      return 0
+      return new CleanOrphanFilesResult(0, 0)
     }
     val (deleted, waitToRelease) = tables.map {
       table =>
@@ -207,15 +219,15 @@ object SparkOrphanFilesClean extends SQLConfHelper {
     try {
       val result = deleted
         .reduce((l, r) => l.union(r))
-        .toDF("deleted")
-        .agg(sum("deleted"))
+        .toDF("deletedFilesCount", "deletedFilesLenInBytes")
+        .agg(functions.sum("deletedFilesCount"), 
functions.sum("deletedFilesLenInBytes"))
         .head()
-      assert(result.schema.size == 1, result.schema)
+      assert(result.schema.size == 2, result.schema)
       if (result.isNullAt(0)) {
         // no files can be deleted
-        0
+        new CleanOrphanFilesResult(0, 0)
       } else {
-        result.getLong(0)
+        new CleanOrphanFilesResult(result.getLong(0), result.getLong(1))
       }
     } finally {
       waitToRelease.foreach(_.unpersist())
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index d9d7381126..3ffe7fba26 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -52,7 +52,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     fileIO.tryToWriteAtomic(orphanFile2, "b")
 
     // by default, no file deleted
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0, 0) :: Nil)
 
     val orphanFile2ModTime = 
fileIO.getFileStatus(orphanFile2).getModificationTime
     val older_than1 = DateTimeUtils.formatLocalDateTime(
@@ -63,7 +63,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
 
     checkAnswer(
       spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than1')"),
-      Row(1) :: Nil)
+      Row(1, 1) :: Nil)
 
     val older_than2 = DateTimeUtils.formatLocalDateTime(
       DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
@@ -71,9 +71,9 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
 
     checkAnswer(
       spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than2')"),
-      Row(1) :: Nil)
+      Row(1, 1) :: Nil)
 
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0, 0) :: Nil)
   }
 
   test("Paimon procedure: dry run remove orphan files") {
@@ -97,7 +97,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     fileIO.writeFile(orphanFile2, "b", true)
 
     // by default, no file deleted
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0, 0) :: Nil)
 
     val older_than = DateTimeUtils.formatLocalDateTime(
       DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
@@ -106,10 +106,10 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     checkAnswer(
       spark.sql(
         s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than', dry_run => true)"),
-      Row(2) :: Nil
+      Row(2, 2) :: Nil
     )
 
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0, 0) :: Nil)
   }
 
   test("Paimon procedure: remove database orphan files") {
@@ -146,7 +146,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     fileIO2.tryToWriteAtomic(orphanFile22, "b")
 
     // by default, no file deleted
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), 
Row(0) :: Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), 
Row(0, 0) :: Nil)
 
     val orphanFile12ModTime = 
fileIO1.getFileStatus(orphanFile12).getModificationTime
     val older_than1 = DateTimeUtils.formatLocalDateTime(
@@ -157,7 +157,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
 
     checkAnswer(
       spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than 
=> '$older_than1')"),
-      Row(2) :: Nil
+      Row(2, 2) :: Nil
     )
 
     val older_than2 = DateTimeUtils.formatLocalDateTime(
@@ -166,10 +166,10 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
 
     checkAnswer(
       spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than 
=> '$older_than2')"),
-      Row(2) :: Nil
+      Row(2, 2) :: Nil
     )
 
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), 
Row(0) :: Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), 
Row(0, 0) :: Nil)
   }
 
   test("Paimon procedure: remove orphan files with mode") {
@@ -193,7 +193,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     fileIO.tryToWriteAtomic(orphanFile2, "b")
 
     // by default, no file deleted
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0, 0) :: Nil)
 
     val orphanFile2ModTime = 
fileIO.getFileStatus(orphanFile2).getModificationTime
     val older_than1 = DateTimeUtils.formatLocalDateTime(
@@ -205,7 +205,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     checkAnswer(
       spark.sql(
         s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than1', mode => 'diSTributed')"),
-      Row(1) :: Nil)
+      Row(1, 1) :: Nil)
 
     val older_than2 = DateTimeUtils.formatLocalDateTime(
       DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
@@ -214,9 +214,9 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     checkAnswer(
       spark.sql(
         s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than2', mode => 'local')"),
-      Row(1) :: Nil)
+      Row(1, 1) :: Nil)
 
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0, 0) :: Nil)
   }
 
 }

Reply via email to