This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 79154d4684 [core] Remove Catalog.fileio method (#4973)
79154d4684 is described below

commit 79154d46844dd88a4afbc5eefdc4684d07e20342
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 21 18:31:33 2025 +0800

    [core] Remove Catalog.fileio method (#4973)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |  3 +-
 .../java/org/apache/paimon/catalog/Catalog.java    |  7 ---
 .../org/apache/paimon/catalog/DelegateCatalog.java | 18 +++----
 .../migrate/IcebergMigrateHadoopMetadata.java      | 27 ++++++-----
 .../IcebergMigrateHadoopMetadataFactory.java       |  7 ++-
 .../migrate/IcebergMigrateMetadataFactory.java     |  4 +-
 .../paimon/iceberg/migrate/IcebergMigrator.java    | 26 +++-------
 .../paimon/operation/LocalOrphanFilesClean.java    | 26 +++-------
 .../apache/paimon/operation/OrphanFilesClean.java  | 56 +++++++++++-----------
 .../apache/paimon/privilege/PrivilegedCatalog.java |  9 +++-
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 14 ------
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  6 +--
 .../procedure/RemoveOrphanFilesProcedure.java      |  4 +-
 .../flink/action/RemoveOrphanFilesAction.java      |  3 +-
 .../paimon/flink/clone/CopyFileOperator.java       | 33 +++++++++----
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java | 14 ++----
 .../procedure/RemoveOrphanFilesProcedure.java      |  6 +--
 .../privilege/InitFileBasedPrivilegeProcedure.java | 14 +++++-
 .../procedure/RemoveOrphanFilesProcedure.java      |  2 -
 .../spark/procedure/SparkOrphanFilesClean.scala    |  9 ++--
 20 files changed, 126 insertions(+), 162 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 7a72da38e7..508d8de625 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -86,7 +86,8 @@ public abstract class AbstractCatalog implements Catalog {
         return catalogOptions.toMap();
     }
 
-    @Override
+    public abstract String warehouse();
+
     public FileIO fileIO() {
         return fileIO;
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index d0ad86c224..e03cbc3ac8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.annotation.Public;
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -368,12 +367,6 @@ public interface Catalog extends AutoCloseable {
 
     // ==================== Catalog Information ==========================
 
-    /** Warehouse root path for creating new databases. */
-    String warehouse();
-
-    /** {@link FileIO} of this catalog. It can access {@link #warehouse()} 
path. */
-    FileIO fileIO();
-
     /** Catalog options for re-creating this catalog. */
     Map<String, String> options();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index aa7852456e..2d8d3f3e12 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.catalog;
 
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -46,21 +45,11 @@ public abstract class DelegateCatalog implements Catalog {
         return wrapped.caseSensitive();
     }
 
-    @Override
-    public String warehouse() {
-        return wrapped.warehouse();
-    }
-
     @Override
     public Map<String, String> options() {
         return wrapped.options();
     }
 
-    @Override
-    public FileIO fileIO() {
-        return wrapped.fileIO();
-    }
-
     @Override
     public List<String> listDatabases() {
         return wrapped.listDatabases();
@@ -200,4 +189,11 @@ public abstract class DelegateCatalog implements Catalog {
     public void close() throws Exception {
         wrapped.close();
     }
+
+    public static Catalog rootCatalog(Catalog catalog) {
+        while (catalog instanceof DelegateCatalog) {
+            catalog = ((DelegateCatalog) catalog).wrapped();
+        }
+        return catalog;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
index a6c5fb027b..1991be34e9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.iceberg.migrate;
 
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -38,16 +39,14 @@ public class IcebergMigrateHadoopMetadata implements 
IcebergMigrateMetadata {
     private static final String VERSION_HINT_FILENAME = "version-hint.text";
     private static final String ICEBERG_WAREHOUSE = "iceberg_warehouse";
 
-    private final FileIO fileIO;
     private final Identifier icebergIdentifier;
     private final Options icebergOptions;
 
     private Path icebergLatestMetaVersionPath;
     private IcebergPathFactory icebergMetaPathFactory;
+    private FileIO fileIO;
 
-    public IcebergMigrateHadoopMetadata(
-            Identifier icebergIdentifier, FileIO fileIO, Options 
icebergOptions) {
-        this.fileIO = fileIO;
+    public IcebergMigrateHadoopMetadata(Identifier icebergIdentifier, Options 
icebergOptions) {
         this.icebergIdentifier = icebergIdentifier;
         this.icebergOptions = icebergOptions;
     }
@@ -58,15 +57,19 @@ public class IcebergMigrateHadoopMetadata implements 
IcebergMigrateMetadata {
                 icebergOptions.get(ICEBERG_WAREHOUSE) != null,
                 "'iceberg_warehouse' is null. "
                         + "In hadoop-catalog, you should explicitly set this 
argument for finding iceberg metadata.");
+        Path path =
+                new Path(
+                        String.format(
+                                "%s/%s/metadata",
+                                icebergIdentifier.getDatabaseName(),
+                                icebergIdentifier.getTableName()));
+        try {
+            fileIO = FileIO.get(path, CatalogContext.create(icebergOptions));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
         this.icebergMetaPathFactory =
-                new IcebergPathFactory(
-                        new Path(
-                                icebergOptions.get(ICEBERG_WAREHOUSE),
-                                new Path(
-                                        String.format(
-                                                "%s/%s/metadata",
-                                                
icebergIdentifier.getDatabaseName(),
-                                                
icebergIdentifier.getTableName()))));
+                new IcebergPathFactory(new 
Path(icebergOptions.get(ICEBERG_WAREHOUSE), path));
         long icebergLatestMetaVersion = getIcebergLatestMetaVersion();
 
         this.icebergLatestMetaVersionPath =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
index 6666301014..50857f1d37 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.iceberg.migrate;
 
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.iceberg.IcebergOptions;
 import org.apache.paimon.options.Options;
 
@@ -28,12 +27,12 @@ public class IcebergMigrateHadoopMetadataFactory implements 
IcebergMigrateMetada
 
     @Override
     public String identifier() {
-        return IcebergOptions.StorageType.HADOOP_CATALOG.toString() + 
"_migrate";
+        return IcebergOptions.StorageType.HADOOP_CATALOG + "_migrate";
     }
 
     @Override
     public IcebergMigrateHadoopMetadata create(
-            Identifier icebergIdentifier, FileIO fileIO, Options 
icebergOptions) {
-        return new IcebergMigrateHadoopMetadata(icebergIdentifier, fileIO, 
icebergOptions);
+            Identifier icebergIdentifier, Options icebergOptions) {
+        return new IcebergMigrateHadoopMetadata(icebergIdentifier, 
icebergOptions);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
index f727088f5d..3baec4404e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
@@ -20,12 +20,10 @@ package org.apache.paimon.iceberg.migrate;
 
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.factories.Factory;
-import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.options.Options;
 
 /** Factory to create {@link IcebergMigrateMetadata}. */
 public interface IcebergMigrateMetadataFactory extends Factory {
 
-    IcebergMigrateMetadata create(
-            Identifier icebergIdentifier, FileIO fileIO, Options 
icebergOptions);
+    IcebergMigrateMetadata create(Identifier icebergIdentifier, Options 
icebergOptions);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
index 44162dea7f..9e91fa2d18 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
@@ -73,7 +73,6 @@ public class IcebergMigrator implements Migrator {
     private final ThreadPoolExecutor executor;
 
     private final Catalog paimonCatalog;
-    private final FileIO paimonFileIO;
     private final String paimonDatabaseName;
     private final String paimonTableName;
 
@@ -100,7 +99,6 @@ public class IcebergMigrator implements Migrator {
             Options icebergOptions,
             Integer parallelism) {
         this.paimonCatalog = paimonCatalog;
-        this.paimonFileIO = paimonCatalog.fileIO();
         this.paimonDatabaseName = paimonDatabaseName;
         this.paimonTableName = paimonTableName;
 
@@ -126,9 +124,7 @@ public class IcebergMigrator implements Migrator {
 
         icebergMigrateMetadata =
                 icebergMigrateMetadataFactory.create(
-                        Identifier.create(icebergDatabaseName, 
icebergTableName),
-                        paimonFileIO,
-                        icebergOptions);
+                        Identifier.create(icebergDatabaseName, 
icebergTableName), icebergOptions);
 
         this.icebergMetadata = icebergMigrateMetadata.icebergMetadata();
         this.icebergLatestMetadataLocation = 
icebergMigrateMetadata.icebergLatestMetadataLocation();
@@ -148,6 +144,7 @@ public class IcebergMigrator implements Migrator {
 
         try {
             FileStoreTable paimonTable = (FileStoreTable) 
paimonCatalog.getTable(paimonIdentifier);
+            FileIO fileIO = paimonTable.fileIO();
 
             IcebergManifestFile manifestFile =
                     IcebergManifestFile.create(paimonTable, 
icebergMetaPathFactory);
@@ -214,8 +211,8 @@ public class IcebergMigrator implements Migrator {
                 for (Map.Entry<Path, Path> entry : rollback.entrySet()) {
                     Path newPath = entry.getKey();
                     Path origin = entry.getValue();
-                    if (paimonFileIO.exists(newPath)) {
-                        paimonFileIO.rename(newPath, origin);
+                    if (fileIO.exists(newPath)) {
+                        fileIO.rename(newPath, origin);
                     }
                 }
 
@@ -331,8 +328,7 @@ public class IcebergMigrator implements Migrator {
         BinaryRow partitionRow = BinaryRow.EMPTY_ROW;
         Path newDir = 
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
 
-        return new MigrateTask(
-                icebergDataFileMetas, paimonFileIO, paimonTable, partitionRow, 
newDir, rollback);
+        return new MigrateTask(icebergDataFileMetas, paimonTable, 
partitionRow, newDir, rollback);
     }
 
     private List<MigrateTask> importPartitionedTable(
@@ -347,13 +343,7 @@ public class IcebergMigrator implements Migrator {
             BinaryRow partitionRow = entry.getKey();
             Path newDir = 
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
             migrateTasks.add(
-                    new MigrateTask(
-                            entry.getValue(),
-                            paimonFileIO,
-                            paimonTable,
-                            partitionRow,
-                            newDir,
-                            rollback));
+                    new MigrateTask(entry.getValue(), paimonTable, 
partitionRow, newDir, rollback));
         }
         return migrateTasks;
     }
@@ -362,7 +352,6 @@ public class IcebergMigrator implements Migrator {
     public static class MigrateTask implements Callable<CommitMessage> {
 
         private final List<IcebergDataFileMeta> icebergDataFileMetas;
-        private final FileIO fileIO;
         private final FileStoreTable paimonTable;
         private final BinaryRow partitionRow;
         private final Path newDir;
@@ -370,13 +359,11 @@ public class IcebergMigrator implements Migrator {
 
         public MigrateTask(
                 List<IcebergDataFileMeta> icebergDataFileMetas,
-                FileIO fileIO,
                 FileStoreTable paimonTable,
                 BinaryRow partitionRow,
                 Path newDir,
                 Map<Path, Path> rollback) {
             this.icebergDataFileMetas = icebergDataFileMetas;
-            this.fileIO = fileIO;
             this.paimonTable = paimonTable;
             this.partitionRow = partitionRow;
             this.newDir = newDir;
@@ -385,6 +372,7 @@ public class IcebergMigrator implements Migrator {
 
         @Override
         public CommitMessage call() throws Exception {
+            FileIO fileIO = paimonTable.fileIO();
             if (!fileIO.exists(newDir)) {
                 fileIO.mkdirs(newDir);
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 8f77670472..fc7895b1e2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -27,7 +27,6 @@ import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.SerializableConsumer;
 
 import javax.annotation.Nullable;
 
@@ -81,15 +80,11 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
     }
 
     public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) {
-        this(table, olderThanMillis, path -> 
table.fileIO().deleteQuietly(path), false);
+        this(table, olderThanMillis, false);
     }
 
-    public LocalOrphanFilesClean(
-            FileStoreTable table,
-            long olderThanMillis,
-            SerializableConsumer<Path> fileCleaner,
-            boolean dryRun) {
-        super(table, olderThanMillis, fileCleaner);
+    public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis, 
boolean dryRun) {
+        super(table, olderThanMillis, dryRun);
         this.deleteFiles = new ArrayList<>();
         this.executor =
                 createCachedThreadPool(
@@ -125,7 +120,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
                 .forEach(
                         deleteFileInfo -> {
                             
deletedFilesLenInBytes.addAndGet(deleteFileInfo.getRight());
-                            fileCleaner.accept(deleteFileInfo.getLeft());
+                            cleanFile(deleteFileInfo.getLeft());
                         });
         deleteFiles.addAll(
                 candidateDeletes.stream()
@@ -239,7 +234,6 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
             String databaseName,
             @Nullable String tableName,
             long olderThanMillis,
-            SerializableConsumer<Path> fileCleaner,
             @Nullable Integer parallelism,
             boolean dryRun)
             throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
@@ -269,8 +263,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
                     table.getClass().getName());
 
             orphanFilesCleans.add(
-                    new LocalOrphanFilesClean(
-                            (FileStoreTable) table, olderThanMillis, 
fileCleaner, dryRun));
+                    new LocalOrphanFilesClean((FileStoreTable) table, 
olderThanMillis, dryRun));
         }
 
         return orphanFilesCleans;
@@ -281,19 +274,12 @@ public class LocalOrphanFilesClean extends 
OrphanFilesClean {
             String databaseName,
             @Nullable String tableName,
             long olderThanMillis,
-            SerializableConsumer<Path> fileCleaner,
             @Nullable Integer parallelism,
             boolean dryRun)
             throws Catalog.DatabaseNotExistException, 
Catalog.TableNotExistException {
         List<LocalOrphanFilesClean> tableCleans =
                 createOrphanFilesCleans(
-                        catalog,
-                        databaseName,
-                        tableName,
-                        olderThanMillis,
-                        fileCleaner,
-                        parallelism,
-                        dryRun);
+                        catalog, databaseName, tableName, olderThanMillis, 
parallelism, dryRun);
 
         ExecutorService executorService =
                 
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 63214253d4..03e20ae4f9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
@@ -36,7 +35,6 @@ import org.apache.paimon.utils.DateTimeUtils;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.SerializableConsumer;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.SupplierWithIOException;
 import org.apache.paimon.utils.TagManager;
@@ -90,18 +88,17 @@ public abstract class OrphanFilesClean implements 
Serializable {
     protected final FileStoreTable table;
     protected final FileIO fileIO;
     protected final long olderThanMillis;
-    protected final SerializableConsumer<Path> fileCleaner;
+    protected final boolean dryRun;
     protected final int partitionKeysNum;
     protected final Path location;
 
-    public OrphanFilesClean(
-            FileStoreTable table, long olderThanMillis, 
SerializableConsumer<Path> fileCleaner) {
+    public OrphanFilesClean(FileStoreTable table, long olderThanMillis, 
boolean dryRun) {
         this.table = table;
         this.fileIO = table.fileIO();
         this.partitionKeysNum = table.partitionKeys().size();
         this.location = table.location();
         this.olderThanMillis = olderThanMillis;
-        this.fileCleaner = fileCleaner;
+        this.dryRun = dryRun;
     }
 
     protected List<String> validBranches() {
@@ -163,7 +160,20 @@ public abstract class OrphanFilesClean implements 
Serializable {
         Long fileSize = deleteFileInfo.getRight();
         deletedFilesConsumer.accept(filePath);
         deletedFilesLenInBytesConsumer.accept(fileSize);
-        fileCleaner.accept(filePath);
+        cleanFile(filePath);
+    }
+
+    protected void cleanFile(Path path) {
+        if (!dryRun) {
+            try {
+                if (fileIO.isDir(path)) {
+                    fileIO.deleteDirectoryQuietly(path);
+                } else {
+                    fileIO.deleteQuietly(path);
+                }
+            } catch (IOException ignored) {
+            }
+        }
     }
 
     protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws 
IOException {
@@ -362,28 +372,6 @@ public abstract class OrphanFilesClean implements 
Serializable {
         return status.getModificationTime() < olderThanMillis;
     }
 
-    public static SerializableConsumer<Path> createFileCleaner(
-            Catalog catalog, @Nullable Boolean dryRun) {
-        SerializableConsumer<Path> fileCleaner;
-        if (Boolean.TRUE.equals(dryRun)) {
-            fileCleaner = path -> {};
-        } else {
-            FileIO fileIO = catalog.fileIO();
-            fileCleaner =
-                    path -> {
-                        try {
-                            if (fileIO.isDir(path)) {
-                                fileIO.deleteDirectoryQuietly(path);
-                            } else {
-                                fileIO.deleteQuietly(path);
-                            }
-                        } catch (IOException ignored) {
-                        }
-                    };
-        }
-        return fileCleaner;
-    }
-
     public static long olderThanMillis(@Nullable String olderThan) {
         if (isNullOrWhitespaceOnly(olderThan)) {
             return System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
@@ -412,10 +400,20 @@ public abstract class OrphanFilesClean implements 
Serializable {
     }
 
     public boolean tryDeleteEmptyDirectory(Path path) {
+        if (dryRun) {
+            return false;
+        }
+
         try {
             return fileIO.delete(path, false);
         } catch (IOException e) {
             return false;
         }
     }
+
+    /** Cleaner to clean files. */
+    public interface FileCleaner extends Serializable {
+
+        void clean(String table, Path path);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index acbd15a634..8755426670 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.privilege;
 
+import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogLoader;
 import org.apache.paimon.catalog.DelegateCatalog;
@@ -56,10 +57,14 @@ public class PrivilegedCatalog extends DelegateCatalog {
     }
 
     public static Catalog tryToCreate(Catalog catalog, Options options) {
+        if (!(rootCatalog(catalog) instanceof AbstractCatalog)) {
+            return catalog;
+        }
+
         FileBasedPrivilegeManagerLoader fileBasedPrivilegeManagerLoader =
                 new FileBasedPrivilegeManagerLoader(
-                        catalog.warehouse(),
-                        catalog.fileIO(),
+                        ((AbstractCatalog) rootCatalog(catalog)).warehouse(),
+                        ((AbstractCatalog) rootCatalog(catalog)).fileIO(),
                         options.get(PrivilegedCatalog.USER),
                         options.get(PrivilegedCatalog.PASSWORD));
         FileBasedPrivilegeManager fileBasedPrivilegeManager =
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 9c8100662b..abfdb7b351 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -144,11 +144,6 @@ public class RESTCatalog implements Catalog {
         this.fileIO = dataTokenEnabled ? null : fileIOFromOptions(new 
Path(options.get(WAREHOUSE)));
     }
 
-    @Override
-    public String warehouse() {
-        return context.options().get(WAREHOUSE);
-    }
-
     @Override
     public Map<String, String> options() {
         return context.options().toMap();
@@ -159,15 +154,6 @@ public class RESTCatalog implements Catalog {
         return new RESTCatalogLoader(context);
     }
 
-    @Override
-    public FileIO fileIO() {
-        // TODO remove Catalog.fileIO
-        if (dataTokenEnabled) {
-            throw new UnsupportedOperationException();
-        }
-        return fileIO;
-    }
-
     @Override
     public List<String> listDatabases() {
         ListDatabasesResponse response =
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 05f3adb3c4..ce3c87c4c6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -88,7 +88,7 @@ public class RESTCatalogServer {
         Options conf = new Options();
         conf.setString("warehouse", warehouse);
         this.catalog = TestRESTCatalog.create(CatalogContext.create(conf));
-        this.dispatcher = initDispatcher(catalog, authToken);
+        this.dispatcher = initDispatcher(catalog, warehouse, authToken);
         MockWebServer mockWebServer = new MockWebServer();
         mockWebServer.setDispatcher(dispatcher);
         server = mockWebServer;
@@ -106,7 +106,7 @@ public class RESTCatalogServer {
         server.shutdown();
     }
 
-    public static Dispatcher initDispatcher(Catalog catalog, String authToken) 
{
+    public static Dispatcher initDispatcher(Catalog catalog, String warehouse, 
String authToken) {
         return new Dispatcher() {
             @Override
             public MockResponse dispatch(RecordedRequest request) {
@@ -119,7 +119,7 @@ public class RESTCatalogServer {
                     if ("/v1/config".equals(request.getPath())) {
                         return new MockResponse()
                                 .setResponseCode(200)
-                                .setBody(getConfigBody(catalog.warehouse()));
+                                .setBody(getConfigBody(warehouse));
                     } else if (DATABASE_URI.equals(request.getPath())) {
                         return databasesApiHandler(catalog, request);
                     } else if (request.getPath().startsWith(DATABASE_URI)) {
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 4f8217ffce..70f797b03d 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.procedure.ProcedureContext;
 
 import java.util.Locale;
 
-import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner;
 import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;
 
 /**
@@ -97,7 +96,7 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase 
{
                                     procedureContext.getExecutionEnvironment(),
                                     catalog,
                                     olderThanMillis(olderThan),
-                                    createFileCleaner(catalog, dryRun),
+                                    dryRun,
                                     parallelism,
                                     databaseName,
                                     tableName);
@@ -109,7 +108,6 @@ public class RemoveOrphanFilesProcedure extends 
ProcedureBase {
                                     databaseName,
                                     tableName,
                                     olderThanMillis(olderThan),
-                                    createFileCleaner(catalog, dryRun),
                                     parallelism,
                                     dryRun);
                     break;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
index 318089b30b..030d76fc62 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
@@ -23,7 +23,6 @@ import javax.annotation.Nullable;
 import java.util.Map;
 
 import static 
org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.executeDatabaseOrphanFiles;
-import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner;
 import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;
 
 /** Action to remove the orphan data files and metadata files. */
@@ -61,7 +60,7 @@ public class RemoveOrphanFilesAction extends ActionBase {
                 env,
                 catalog,
                 olderThanMillis(olderThan),
-                createFileCleaner(catalog, dryRun),
+                dryRun,
                 parallelism == null ? null : Integer.parseInt(parallelism),
                 databaseName,
                 tableName);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
index e7002cce1e..040718ad87 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.FlinkCatalogFactory;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.IOUtils;
 
@@ -49,7 +50,8 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
     private transient Catalog sourceCatalog;
     private transient Catalog targetCatalog;
 
-    private transient Map<String, Path> srcLocations;
+    private transient Map<String, FileIO> srcFileIOs;
+    private transient Map<String, FileIO> targetFileIOs;
     private transient Map<String, Path> targetLocations;
 
     public CopyFileOperator(
@@ -64,7 +66,8 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
                 
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
         targetCatalog =
                 
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
-        srcLocations = new HashMap<>();
+        srcFileIOs = new HashMap<>();
+        targetFileIOs = new HashMap<>();
         targetLocations = new HashMap<>();
     }
 
@@ -72,20 +75,32 @@ public class CopyFileOperator extends 
AbstractStreamOperator<CloneFileInfo>
     public void processElement(StreamRecord<CloneFileInfo> streamRecord) 
throws Exception {
         CloneFileInfo cloneFileInfo = streamRecord.getValue();
 
-        FileIO sourceTableFileIO = sourceCatalog.fileIO();
-        FileIO targetTableFileIO = targetCatalog.fileIO();
-
-        Path sourceTableRootPath =
-                srcLocations.computeIfAbsent(
+        FileIO sourceTableFileIO =
+                srcFileIOs.computeIfAbsent(
                         cloneFileInfo.getSourceIdentifier(),
                         key -> {
                             try {
-                                return pathOfTable(
-                                        
sourceCatalog.getTable(Identifier.fromString(key)));
+                                return ((FileStoreTable)
+                                                
sourceCatalog.getTable(Identifier.fromString(key)))
+                                        .fileIO();
                             } catch (Catalog.TableNotExistException e) {
                                 throw new RuntimeException(e);
                             }
                         });
+
+        FileIO targetTableFileIO =
+                targetFileIOs.computeIfAbsent(
+                        cloneFileInfo.getTargetIdentifier(),
+                        key -> {
+                            try {
+                                return ((FileStoreTable)
+                                                
targetCatalog.getTable(Identifier.fromString(key)))
+                                        .fileIO();
+                            } catch (Catalog.TableNotExistException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+
         Path targetTableRootPath =
                 targetLocations.computeIfAbsent(
                         cloneFileInfo.getTargetIdentifier(),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index 23bbbc9b60..39dce07c5e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -32,7 +32,6 @@ import org.apache.paimon.operation.OrphanFilesClean;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.SerializableConsumer;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -76,9 +75,9 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean {
     public FlinkOrphanFilesClean(
             FileStoreTable table,
             long olderThanMillis,
-            SerializableConsumer<Path> fileCleaner,
+            boolean dryRun,
             @Nullable Integer parallelism) {
-        super(table, olderThanMillis, fileCleaner);
+        super(table, olderThanMillis, dryRun);
         this.parallelism = parallelism;
     }
 
@@ -297,7 +296,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                                         if (!used.contains(path.getName())) {
                                             emittedFilesCount++;
                                             emittedFilesLen += 
fileInfo.getRight();
-                                            fileCleaner.accept(path);
+                                            cleanFile(path);
                                             LOG.info("Dry clean: {}", path);
                                         }
                                     }
@@ -319,7 +318,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
             StreamExecutionEnvironment env,
             Catalog catalog,
             long olderThanMillis,
-            SerializableConsumer<Path> fileCleaner,
+            boolean dryRun,
             @Nullable Integer parallelism,
             String databaseName,
             @Nullable String tableName)
@@ -341,10 +340,7 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
 
             DataStream<CleanOrphanFilesResult> clean =
                     new FlinkOrphanFilesClean(
-                                    (FileStoreTable) table,
-                                    olderThanMillis,
-                                    fileCleaner,
-                                    parallelism)
+                                    (FileStoreTable) table, olderThanMillis, 
dryRun, parallelism)
                             .doOrphanClean(env);
             if (clean != null) {
                 orphanFilesCleans.add(clean);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 8634e1e5e3..c3983c8aa5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.procedure.ProcedureContext;
 
 import java.util.Locale;
 
-import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner;
 import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;
 
 /**
@@ -85,7 +84,7 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase 
{
                                     procedureContext.getExecutionEnvironment(),
                                     catalog,
                                     olderThanMillis(olderThan),
-                                    createFileCleaner(catalog, dryRun),
+                                    dryRun != null && dryRun,
                                     parallelism,
                                     databaseName,
                                     tableName);
@@ -97,9 +96,8 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase 
{
                                     databaseName,
                                     tableName,
                                     olderThanMillis(olderThan),
-                                    createFileCleaner(catalog, dryRun),
                                     parallelism,
-                                    dryRun);
+                                    dryRun != null && dryRun);
                     break;
                 default:
                     throw new IllegalArgumentException(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
index 40c58a1c8a..4d2e8b3855 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.procedure.privilege;
 
+import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.flink.procedure.ProcedureBase;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.privilege.FileBasedPrivilegeManager;
@@ -29,6 +30,8 @@ import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
 
+import static org.apache.paimon.catalog.DelegateCatalog.rootCatalog;
+
 /**
  * Procedure to initialize file-based privilege system in warehouse. This 
procedure will
  * automatically create a root user with the provided password. Usage:
@@ -48,11 +51,18 @@ public class InitFileBasedPrivilegeProcedure extends 
ProcedureBase {
             throw new IllegalArgumentException("Catalog is already a 
PrivilegedCatalog");
         }
 
+        if (!(rootCatalog(catalog) instanceof AbstractCatalog)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Catalog %s cannot support Privileged Catalog.",
+                            rootCatalog(catalog).getClass().getName()));
+        }
+
         Options options = new Options(catalog.options());
         PrivilegeManager privilegeManager =
                 new FileBasedPrivilegeManager(
-                        catalog.warehouse(),
-                        catalog.fileIO(),
+                        ((AbstractCatalog) rootCatalog(catalog)).warehouse(),
+                        ((AbstractCatalog) rootCatalog(catalog)).fileIO(),
                         options.get(PrivilegedCatalog.USER),
                         options.get(PrivilegedCatalog.PASSWORD));
         privilegeManager.initializePrivilege(rootPassword);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index 604b1d9b19..7ddf2eba03 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -120,7 +120,6 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
                                     identifier.getDatabaseName(),
                                     identifier.getTableName(),
                                     
OrphanFilesClean.olderThanMillis(olderThan),
-                                    
OrphanFilesClean.createFileCleaner(catalog, dryRun),
                                     parallelism,
                                     dryRun);
                     break;
@@ -131,7 +130,6 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
                                     identifier.getDatabaseName(),
                                     identifier.getTableName(),
                                     
OrphanFilesClean.olderThanMillis(olderThan),
-                                    
OrphanFilesClean.createFileCleaner(catalog, dryRun),
                                     parallelism,
                                     dryRun);
                     break;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
index 328a11c017..010a3e4ede 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
@@ -44,11 +44,10 @@ import scala.collection.mutable.ArrayBuffer
 case class SparkOrphanFilesClean(
     specifiedTable: FileStoreTable,
     specifiedOlderThanMillis: Long,
-    specifiedFileCleaner: SerializableConsumer[Path],
     parallelism: Int,
-    dryRun: Boolean,
+    dryRunPara: Boolean,
     @transient spark: SparkSession)
-  extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, 
specifiedFileCleaner)
+  extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, 
dryRunPara)
   with SQLConfHelper
   with Logging {
 
@@ -150,7 +149,7 @@ case class SparkOrphanFilesClean(
             val pathToClean = fileInfo.getString(1)
             val deletedPath = new Path(pathToClean)
             deletedFilesLenInBytes += fileInfo.getLong(2)
-            specifiedFileCleaner.accept(deletedPath)
+            cleanFile(deletedPath)
             logInfo(s"Cleaned file: $pathToClean")
             dataDirs.add(fileInfo.getString(3))
             deletedFilesCount += 1
@@ -198,7 +197,6 @@ object SparkOrphanFilesClean extends SQLConfHelper {
       databaseName: String,
       tableName: String,
       olderThanMillis: Long,
-      fileCleaner: SerializableConsumer[Path],
       parallelismOpt: Integer,
       dryRun: Boolean): CleanOrphanFilesResult = {
     val spark = SparkSession.active
@@ -230,7 +228,6 @@ object SparkOrphanFilesClean extends SQLConfHelper {
         new SparkOrphanFilesClean(
           table,
           olderThanMillis,
-          fileCleaner,
           parallelism,
           dryRun,
           spark


Reply via email to