This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 79154d4684 [core] Remove Catalog.fileio method (#4973)
79154d4684 is described below
commit 79154d46844dd88a4afbc5eefdc4684d07e20342
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Jan 21 18:31:33 2025 +0800
[core] Remove Catalog.fileio method (#4973)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 3 +-
.../java/org/apache/paimon/catalog/Catalog.java | 7 ---
.../org/apache/paimon/catalog/DelegateCatalog.java | 18 +++----
.../migrate/IcebergMigrateHadoopMetadata.java | 27 ++++++-----
.../IcebergMigrateHadoopMetadataFactory.java | 7 ++-
.../migrate/IcebergMigrateMetadataFactory.java | 4 +-
.../paimon/iceberg/migrate/IcebergMigrator.java | 26 +++-------
.../paimon/operation/LocalOrphanFilesClean.java | 26 +++-------
.../apache/paimon/operation/OrphanFilesClean.java | 56 +++++++++++-----------
.../apache/paimon/privilege/PrivilegedCatalog.java | 9 +++-
.../java/org/apache/paimon/rest/RESTCatalog.java | 14 ------
.../org/apache/paimon/rest/RESTCatalogServer.java | 6 +--
.../procedure/RemoveOrphanFilesProcedure.java | 4 +-
.../flink/action/RemoveOrphanFilesAction.java | 3 +-
.../paimon/flink/clone/CopyFileOperator.java | 33 +++++++++----
.../paimon/flink/orphan/FlinkOrphanFilesClean.java | 14 ++----
.../procedure/RemoveOrphanFilesProcedure.java | 6 +--
.../privilege/InitFileBasedPrivilegeProcedure.java | 14 +++++-
.../procedure/RemoveOrphanFilesProcedure.java | 2 -
.../spark/procedure/SparkOrphanFilesClean.scala | 9 ++--
20 files changed, 126 insertions(+), 162 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 7a72da38e7..508d8de625 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -86,7 +86,8 @@ public abstract class AbstractCatalog implements Catalog {
return catalogOptions.toMap();
}
- @Override
+ public abstract String warehouse();
+
public FileIO fileIO() {
return fileIO;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index d0ad86c224..e03cbc3ac8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -19,7 +19,6 @@
package org.apache.paimon.catalog;
import org.apache.paimon.annotation.Public;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -368,12 +367,6 @@ public interface Catalog extends AutoCloseable {
// ==================== Catalog Information ==========================
- /** Warehouse root path for creating new databases. */
- String warehouse();
-
- /** {@link FileIO} of this catalog. It can access {@link #warehouse()}
path. */
- FileIO fileIO();
-
/** Catalog options for re-creating this catalog. */
Map<String, String> options();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index aa7852456e..2d8d3f3e12 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -18,7 +18,6 @@
package org.apache.paimon.catalog;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -46,21 +45,11 @@ public abstract class DelegateCatalog implements Catalog {
return wrapped.caseSensitive();
}
- @Override
- public String warehouse() {
- return wrapped.warehouse();
- }
-
@Override
public Map<String, String> options() {
return wrapped.options();
}
- @Override
- public FileIO fileIO() {
- return wrapped.fileIO();
- }
-
@Override
public List<String> listDatabases() {
return wrapped.listDatabases();
@@ -200,4 +189,11 @@ public abstract class DelegateCatalog implements Catalog {
public void close() throws Exception {
wrapped.close();
}
+
+ public static Catalog rootCatalog(Catalog catalog) {
+ while (catalog instanceof DelegateCatalog) {
+ catalog = ((DelegateCatalog) catalog).wrapped();
+ }
+ return catalog;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
index a6c5fb027b..1991be34e9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
@@ -18,6 +18,7 @@
package org.apache.paimon.iceberg.migrate;
+import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -38,16 +39,14 @@ public class IcebergMigrateHadoopMetadata implements
IcebergMigrateMetadata {
private static final String VERSION_HINT_FILENAME = "version-hint.text";
private static final String ICEBERG_WAREHOUSE = "iceberg_warehouse";
- private final FileIO fileIO;
private final Identifier icebergIdentifier;
private final Options icebergOptions;
private Path icebergLatestMetaVersionPath;
private IcebergPathFactory icebergMetaPathFactory;
+ private FileIO fileIO;
- public IcebergMigrateHadoopMetadata(
- Identifier icebergIdentifier, FileIO fileIO, Options
icebergOptions) {
- this.fileIO = fileIO;
+ public IcebergMigrateHadoopMetadata(Identifier icebergIdentifier, Options
icebergOptions) {
this.icebergIdentifier = icebergIdentifier;
this.icebergOptions = icebergOptions;
}
@@ -58,15 +57,19 @@ public class IcebergMigrateHadoopMetadata implements
IcebergMigrateMetadata {
icebergOptions.get(ICEBERG_WAREHOUSE) != null,
"'iceberg_warehouse' is null. "
+ "In hadoop-catalog, you should explicitly set this
argument for finding iceberg metadata.");
+ Path path =
+ new Path(
+ String.format(
+ "%s/%s/metadata",
+ icebergIdentifier.getDatabaseName(),
+ icebergIdentifier.getTableName()));
+ try {
+ fileIO = FileIO.get(path, CatalogContext.create(icebergOptions));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
this.icebergMetaPathFactory =
- new IcebergPathFactory(
- new Path(
- icebergOptions.get(ICEBERG_WAREHOUSE),
- new Path(
- String.format(
- "%s/%s/metadata",
-
icebergIdentifier.getDatabaseName(),
-
icebergIdentifier.getTableName()))));
+ new IcebergPathFactory(new
Path(icebergOptions.get(ICEBERG_WAREHOUSE), path));
long icebergLatestMetaVersion = getIcebergLatestMetaVersion();
this.icebergLatestMetaVersionPath =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
index 6666301014..50857f1d37 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.iceberg.migrate;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.options.Options;
@@ -28,12 +27,12 @@ public class IcebergMigrateHadoopMetadataFactory implements
IcebergMigrateMetada
@Override
public String identifier() {
- return IcebergOptions.StorageType.HADOOP_CATALOG.toString() +
"_migrate";
+ return IcebergOptions.StorageType.HADOOP_CATALOG + "_migrate";
}
@Override
public IcebergMigrateHadoopMetadata create(
- Identifier icebergIdentifier, FileIO fileIO, Options
icebergOptions) {
- return new IcebergMigrateHadoopMetadata(icebergIdentifier, fileIO,
icebergOptions);
+ Identifier icebergIdentifier, Options icebergOptions) {
+ return new IcebergMigrateHadoopMetadata(icebergIdentifier,
icebergOptions);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
index f727088f5d..3baec4404e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
@@ -20,12 +20,10 @@ package org.apache.paimon.iceberg.migrate;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.factories.Factory;
-import org.apache.paimon.fs.FileIO;
import org.apache.paimon.options.Options;
/** Factory to create {@link IcebergMigrateMetadata}. */
public interface IcebergMigrateMetadataFactory extends Factory {
- IcebergMigrateMetadata create(
- Identifier icebergIdentifier, FileIO fileIO, Options
icebergOptions);
+ IcebergMigrateMetadata create(Identifier icebergIdentifier, Options
icebergOptions);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
index 44162dea7f..9e91fa2d18 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
@@ -73,7 +73,6 @@ public class IcebergMigrator implements Migrator {
private final ThreadPoolExecutor executor;
private final Catalog paimonCatalog;
- private final FileIO paimonFileIO;
private final String paimonDatabaseName;
private final String paimonTableName;
@@ -100,7 +99,6 @@ public class IcebergMigrator implements Migrator {
Options icebergOptions,
Integer parallelism) {
this.paimonCatalog = paimonCatalog;
- this.paimonFileIO = paimonCatalog.fileIO();
this.paimonDatabaseName = paimonDatabaseName;
this.paimonTableName = paimonTableName;
@@ -126,9 +124,7 @@ public class IcebergMigrator implements Migrator {
icebergMigrateMetadata =
icebergMigrateMetadataFactory.create(
- Identifier.create(icebergDatabaseName,
icebergTableName),
- paimonFileIO,
- icebergOptions);
+ Identifier.create(icebergDatabaseName,
icebergTableName), icebergOptions);
this.icebergMetadata = icebergMigrateMetadata.icebergMetadata();
this.icebergLatestMetadataLocation =
icebergMigrateMetadata.icebergLatestMetadataLocation();
@@ -148,6 +144,7 @@ public class IcebergMigrator implements Migrator {
try {
FileStoreTable paimonTable = (FileStoreTable)
paimonCatalog.getTable(paimonIdentifier);
+ FileIO fileIO = paimonTable.fileIO();
IcebergManifestFile manifestFile =
IcebergManifestFile.create(paimonTable,
icebergMetaPathFactory);
@@ -214,8 +211,8 @@ public class IcebergMigrator implements Migrator {
for (Map.Entry<Path, Path> entry : rollback.entrySet()) {
Path newPath = entry.getKey();
Path origin = entry.getValue();
- if (paimonFileIO.exists(newPath)) {
- paimonFileIO.rename(newPath, origin);
+ if (fileIO.exists(newPath)) {
+ fileIO.rename(newPath, origin);
}
}
@@ -331,8 +328,7 @@ public class IcebergMigrator implements Migrator {
BinaryRow partitionRow = BinaryRow.EMPTY_ROW;
Path newDir =
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
- return new MigrateTask(
- icebergDataFileMetas, paimonFileIO, paimonTable, partitionRow,
newDir, rollback);
+ return new MigrateTask(icebergDataFileMetas, paimonTable,
partitionRow, newDir, rollback);
}
private List<MigrateTask> importPartitionedTable(
@@ -347,13 +343,7 @@ public class IcebergMigrator implements Migrator {
BinaryRow partitionRow = entry.getKey();
Path newDir =
paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
migrateTasks.add(
- new MigrateTask(
- entry.getValue(),
- paimonFileIO,
- paimonTable,
- partitionRow,
- newDir,
- rollback));
+ new MigrateTask(entry.getValue(), paimonTable,
partitionRow, newDir, rollback));
}
return migrateTasks;
}
@@ -362,7 +352,6 @@ public class IcebergMigrator implements Migrator {
public static class MigrateTask implements Callable<CommitMessage> {
private final List<IcebergDataFileMeta> icebergDataFileMetas;
- private final FileIO fileIO;
private final FileStoreTable paimonTable;
private final BinaryRow partitionRow;
private final Path newDir;
@@ -370,13 +359,11 @@ public class IcebergMigrator implements Migrator {
public MigrateTask(
List<IcebergDataFileMeta> icebergDataFileMetas,
- FileIO fileIO,
FileStoreTable paimonTable,
BinaryRow partitionRow,
Path newDir,
Map<Path, Path> rollback) {
this.icebergDataFileMetas = icebergDataFileMetas;
- this.fileIO = fileIO;
this.paimonTable = paimonTable;
this.partitionRow = partitionRow;
this.newDir = newDir;
@@ -385,6 +372,7 @@ public class IcebergMigrator implements Migrator {
@Override
public CommitMessage call() throws Exception {
+ FileIO fileIO = paimonTable.fileIO();
if (!fileIO.exists(newDir)) {
fileIO.mkdirs(newDir);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 8f77670472..fc7895b1e2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -27,7 +27,6 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.SerializableConsumer;
import javax.annotation.Nullable;
@@ -81,15 +80,11 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
}
public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) {
- this(table, olderThanMillis, path ->
table.fileIO().deleteQuietly(path), false);
+ this(table, olderThanMillis, false);
}
- public LocalOrphanFilesClean(
- FileStoreTable table,
- long olderThanMillis,
- SerializableConsumer<Path> fileCleaner,
- boolean dryRun) {
- super(table, olderThanMillis, fileCleaner);
+ public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis,
boolean dryRun) {
+ super(table, olderThanMillis, dryRun);
this.deleteFiles = new ArrayList<>();
this.executor =
createCachedThreadPool(
@@ -125,7 +120,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
.forEach(
deleteFileInfo -> {
deletedFilesLenInBytes.addAndGet(deleteFileInfo.getRight());
- fileCleaner.accept(deleteFileInfo.getLeft());
+ cleanFile(deleteFileInfo.getLeft());
});
deleteFiles.addAll(
candidateDeletes.stream()
@@ -239,7 +234,6 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
String databaseName,
@Nullable String tableName,
long olderThanMillis,
- SerializableConsumer<Path> fileCleaner,
@Nullable Integer parallelism,
boolean dryRun)
throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
@@ -269,8 +263,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
table.getClass().getName());
orphanFilesCleans.add(
- new LocalOrphanFilesClean(
- (FileStoreTable) table, olderThanMillis,
fileCleaner, dryRun));
+ new LocalOrphanFilesClean((FileStoreTable) table,
olderThanMillis, dryRun));
}
return orphanFilesCleans;
@@ -281,19 +274,12 @@ public class LocalOrphanFilesClean extends
OrphanFilesClean {
String databaseName,
@Nullable String tableName,
long olderThanMillis,
- SerializableConsumer<Path> fileCleaner,
@Nullable Integer parallelism,
boolean dryRun)
throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
List<LocalOrphanFilesClean> tableCleans =
createOrphanFilesCleans(
- catalog,
- databaseName,
- tableName,
- olderThanMillis,
- fileCleaner,
- parallelism,
- dryRun);
+ catalog, databaseName, tableName, olderThanMillis,
parallelism, dryRun);
ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 63214253d4..03e20ae4f9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -19,7 +19,6 @@
package org.apache.paimon.operation;
import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
@@ -36,7 +35,6 @@ import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
-import org.apache.paimon.utils.SerializableConsumer;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SupplierWithIOException;
import org.apache.paimon.utils.TagManager;
@@ -90,18 +88,17 @@ public abstract class OrphanFilesClean implements
Serializable {
protected final FileStoreTable table;
protected final FileIO fileIO;
protected final long olderThanMillis;
- protected final SerializableConsumer<Path> fileCleaner;
+ protected final boolean dryRun;
protected final int partitionKeysNum;
protected final Path location;
- public OrphanFilesClean(
- FileStoreTable table, long olderThanMillis,
SerializableConsumer<Path> fileCleaner) {
+ public OrphanFilesClean(FileStoreTable table, long olderThanMillis,
boolean dryRun) {
this.table = table;
this.fileIO = table.fileIO();
this.partitionKeysNum = table.partitionKeys().size();
this.location = table.location();
this.olderThanMillis = olderThanMillis;
- this.fileCleaner = fileCleaner;
+ this.dryRun = dryRun;
}
protected List<String> validBranches() {
@@ -163,7 +160,20 @@ public abstract class OrphanFilesClean implements
Serializable {
Long fileSize = deleteFileInfo.getRight();
deletedFilesConsumer.accept(filePath);
deletedFilesLenInBytesConsumer.accept(fileSize);
- fileCleaner.accept(filePath);
+ cleanFile(filePath);
+ }
+
+ protected void cleanFile(Path path) {
+ if (!dryRun) {
+ try {
+ if (fileIO.isDir(path)) {
+ fileIO.deleteDirectoryQuietly(path);
+ } else {
+ fileIO.deleteQuietly(path);
+ }
+ } catch (IOException ignored) {
+ }
+ }
}
protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws
IOException {
@@ -362,28 +372,6 @@ public abstract class OrphanFilesClean implements
Serializable {
return status.getModificationTime() < olderThanMillis;
}
- public static SerializableConsumer<Path> createFileCleaner(
- Catalog catalog, @Nullable Boolean dryRun) {
- SerializableConsumer<Path> fileCleaner;
- if (Boolean.TRUE.equals(dryRun)) {
- fileCleaner = path -> {};
- } else {
- FileIO fileIO = catalog.fileIO();
- fileCleaner =
- path -> {
- try {
- if (fileIO.isDir(path)) {
- fileIO.deleteDirectoryQuietly(path);
- } else {
- fileIO.deleteQuietly(path);
- }
- } catch (IOException ignored) {
- }
- };
- }
- return fileCleaner;
- }
-
public static long olderThanMillis(@Nullable String olderThan) {
if (isNullOrWhitespaceOnly(olderThan)) {
return System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1);
@@ -412,10 +400,20 @@ public abstract class OrphanFilesClean implements
Serializable {
}
public boolean tryDeleteEmptyDirectory(Path path) {
+ if (dryRun) {
+ return false;
+ }
+
try {
return fileIO.delete(path, false);
} catch (IOException e) {
return false;
}
}
+
+ /** Cleaner to clean files. */
+ public interface FileCleaner extends Serializable {
+
+ void clean(String table, Path path);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
index acbd15a634..8755426670 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedCatalog.java
@@ -18,6 +18,7 @@
package org.apache.paimon.privilege;
+import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogLoader;
import org.apache.paimon.catalog.DelegateCatalog;
@@ -56,10 +57,14 @@ public class PrivilegedCatalog extends DelegateCatalog {
}
public static Catalog tryToCreate(Catalog catalog, Options options) {
+ if (!(rootCatalog(catalog) instanceof AbstractCatalog)) {
+ return catalog;
+ }
+
FileBasedPrivilegeManagerLoader fileBasedPrivilegeManagerLoader =
new FileBasedPrivilegeManagerLoader(
- catalog.warehouse(),
- catalog.fileIO(),
+ ((AbstractCatalog) rootCatalog(catalog)).warehouse(),
+ ((AbstractCatalog) rootCatalog(catalog)).fileIO(),
options.get(PrivilegedCatalog.USER),
options.get(PrivilegedCatalog.PASSWORD));
FileBasedPrivilegeManager fileBasedPrivilegeManager =
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 9c8100662b..abfdb7b351 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -144,11 +144,6 @@ public class RESTCatalog implements Catalog {
this.fileIO = dataTokenEnabled ? null : fileIOFromOptions(new
Path(options.get(WAREHOUSE)));
}
- @Override
- public String warehouse() {
- return context.options().get(WAREHOUSE);
- }
-
@Override
public Map<String, String> options() {
return context.options().toMap();
@@ -159,15 +154,6 @@ public class RESTCatalog implements Catalog {
return new RESTCatalogLoader(context);
}
- @Override
- public FileIO fileIO() {
- // TODO remove Catalog.fileIO
- if (dataTokenEnabled) {
- throw new UnsupportedOperationException();
- }
- return fileIO;
- }
-
@Override
public List<String> listDatabases() {
ListDatabasesResponse response =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 05f3adb3c4..ce3c87c4c6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -88,7 +88,7 @@ public class RESTCatalogServer {
Options conf = new Options();
conf.setString("warehouse", warehouse);
this.catalog = TestRESTCatalog.create(CatalogContext.create(conf));
- this.dispatcher = initDispatcher(catalog, authToken);
+ this.dispatcher = initDispatcher(catalog, warehouse, authToken);
MockWebServer mockWebServer = new MockWebServer();
mockWebServer.setDispatcher(dispatcher);
server = mockWebServer;
@@ -106,7 +106,7 @@ public class RESTCatalogServer {
server.shutdown();
}
- public static Dispatcher initDispatcher(Catalog catalog, String authToken)
{
+ public static Dispatcher initDispatcher(Catalog catalog, String warehouse,
String authToken) {
return new Dispatcher() {
@Override
public MockResponse dispatch(RecordedRequest request) {
@@ -119,7 +119,7 @@ public class RESTCatalogServer {
if ("/v1/config".equals(request.getPath())) {
return new MockResponse()
.setResponseCode(200)
- .setBody(getConfigBody(catalog.warehouse()));
+ .setBody(getConfigBody(warehouse));
} else if (DATABASE_URI.equals(request.getPath())) {
return databasesApiHandler(catalog, request);
} else if (request.getPath().startsWith(DATABASE_URI)) {
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 4f8217ffce..70f797b03d 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -27,7 +27,6 @@ import org.apache.flink.table.procedure.ProcedureContext;
import java.util.Locale;
-import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner;
import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;
/**
@@ -97,7 +96,7 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase
{
procedureContext.getExecutionEnvironment(),
catalog,
olderThanMillis(olderThan),
- createFileCleaner(catalog, dryRun),
+ dryRun,
parallelism,
databaseName,
tableName);
@@ -109,7 +108,6 @@ public class RemoveOrphanFilesProcedure extends
ProcedureBase {
databaseName,
tableName,
olderThanMillis(olderThan),
- createFileCleaner(catalog, dryRun),
parallelism,
dryRun);
break;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
index 318089b30b..030d76fc62 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
@@ -23,7 +23,6 @@ import javax.annotation.Nullable;
import java.util.Map;
import static
org.apache.paimon.flink.orphan.FlinkOrphanFilesClean.executeDatabaseOrphanFiles;
-import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner;
import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;
/** Action to remove the orphan data files and metadata files. */
@@ -61,7 +60,7 @@ public class RemoveOrphanFilesAction extends ActionBase {
env,
catalog,
olderThanMillis(olderThan),
- createFileCleaner(catalog, dryRun),
+ dryRun,
parallelism == null ? null : Integer.parseInt(parallelism),
databaseName,
tableName);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
index e7002cce1e..040718ad87 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/CopyFileOperator.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;
@@ -49,7 +50,8 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
private transient Catalog sourceCatalog;
private transient Catalog targetCatalog;
- private transient Map<String, Path> srcLocations;
+ private transient Map<String, FileIO> srcFileIOs;
+ private transient Map<String, FileIO> targetFileIOs;
private transient Map<String, Path> targetLocations;
public CopyFileOperator(
@@ -64,7 +66,8 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(sourceCatalogConfig));
targetCatalog =
FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(targetCatalogConfig));
- srcLocations = new HashMap<>();
+ srcFileIOs = new HashMap<>();
+ targetFileIOs = new HashMap<>();
targetLocations = new HashMap<>();
}
@@ -72,20 +75,32 @@ public class CopyFileOperator extends
AbstractStreamOperator<CloneFileInfo>
public void processElement(StreamRecord<CloneFileInfo> streamRecord)
throws Exception {
CloneFileInfo cloneFileInfo = streamRecord.getValue();
- FileIO sourceTableFileIO = sourceCatalog.fileIO();
- FileIO targetTableFileIO = targetCatalog.fileIO();
-
- Path sourceTableRootPath =
- srcLocations.computeIfAbsent(
+ FileIO sourceTableFileIO =
+ srcFileIOs.computeIfAbsent(
cloneFileInfo.getSourceIdentifier(),
key -> {
try {
- return pathOfTable(
-
sourceCatalog.getTable(Identifier.fromString(key)));
+ return ((FileStoreTable)
+
sourceCatalog.getTable(Identifier.fromString(key)))
+ .fileIO();
} catch (Catalog.TableNotExistException e) {
throw new RuntimeException(e);
}
});
+
+ FileIO targetTableFileIO =
+ targetFileIOs.computeIfAbsent(
+ cloneFileInfo.getTargetIdentifier(),
+ key -> {
+ try {
+ return ((FileStoreTable)
+
targetCatalog.getTable(Identifier.fromString(key)))
+ .fileIO();
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
Path targetTableRootPath =
targetLocations.computeIfAbsent(
cloneFileInfo.getTargetIdentifier(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index 23bbbc9b60..39dce07c5e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -32,7 +32,6 @@ import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.SerializableConsumer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -76,9 +75,9 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean {
public FlinkOrphanFilesClean(
FileStoreTable table,
long olderThanMillis,
- SerializableConsumer<Path> fileCleaner,
+ boolean dryRun,
@Nullable Integer parallelism) {
- super(table, olderThanMillis, fileCleaner);
+ super(table, olderThanMillis, dryRun);
this.parallelism = parallelism;
}
@@ -297,7 +296,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
if (!used.contains(path.getName())) {
emittedFilesCount++;
emittedFilesLen +=
fileInfo.getRight();
- fileCleaner.accept(path);
+ cleanFile(path);
LOG.info("Dry clean: {}", path);
}
}
@@ -319,7 +318,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
StreamExecutionEnvironment env,
Catalog catalog,
long olderThanMillis,
- SerializableConsumer<Path> fileCleaner,
+ boolean dryRun,
@Nullable Integer parallelism,
String databaseName,
@Nullable String tableName)
@@ -341,10 +340,7 @@ public class FlinkOrphanFilesClean extends
OrphanFilesClean {
DataStream<CleanOrphanFilesResult> clean =
new FlinkOrphanFilesClean(
- (FileStoreTable) table,
- olderThanMillis,
- fileCleaner,
- parallelism)
+ (FileStoreTable) table, olderThanMillis,
dryRun, parallelism)
.doOrphanClean(env);
if (clean != null) {
orphanFilesCleans.add(clean);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 8634e1e5e3..c3983c8aa5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -30,7 +30,6 @@ import org.apache.flink.table.procedure.ProcedureContext;
import java.util.Locale;
-import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner;
import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;
/**
@@ -85,7 +84,7 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase
{
procedureContext.getExecutionEnvironment(),
catalog,
olderThanMillis(olderThan),
- createFileCleaner(catalog, dryRun),
+ dryRun != null && dryRun,
parallelism,
databaseName,
tableName);
@@ -97,9 +96,8 @@ public class RemoveOrphanFilesProcedure extends ProcedureBase
{
databaseName,
tableName,
olderThanMillis(olderThan),
- createFileCleaner(catalog, dryRun),
parallelism,
- dryRun);
+ dryRun != null && dryRun);
break;
default:
throw new IllegalArgumentException(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
index 40c58a1c8a..4d2e8b3855 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/privilege/InitFileBasedPrivilegeProcedure.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.procedure.privilege;
+import org.apache.paimon.catalog.AbstractCatalog;
import org.apache.paimon.flink.procedure.ProcedureBase;
import org.apache.paimon.options.Options;
import org.apache.paimon.privilege.FileBasedPrivilegeManager;
@@ -29,6 +30,8 @@ import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
+import static org.apache.paimon.catalog.DelegateCatalog.rootCatalog;
+
/**
* Procedure to initialize file-based privilege system in warehouse. This
procedure will
* automatically create a root user with the provided password. Usage:
@@ -48,11 +51,18 @@ public class InitFileBasedPrivilegeProcedure extends
ProcedureBase {
throw new IllegalArgumentException("Catalog is already a
PrivilegedCatalog");
}
+ if (!(rootCatalog(catalog) instanceof AbstractCatalog)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Catalog %s cannot support Privileged Catalog.",
+ rootCatalog(catalog).getClass().getName()));
+ }
+
Options options = new Options(catalog.options());
PrivilegeManager privilegeManager =
new FileBasedPrivilegeManager(
- catalog.warehouse(),
- catalog.fileIO(),
+ ((AbstractCatalog) rootCatalog(catalog)).warehouse(),
+ ((AbstractCatalog) rootCatalog(catalog)).fileIO(),
options.get(PrivilegedCatalog.USER),
options.get(PrivilegedCatalog.PASSWORD));
privilegeManager.initializePrivilege(rootPassword);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index 604b1d9b19..7ddf2eba03 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -120,7 +120,6 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
identifier.getDatabaseName(),
identifier.getTableName(),
OrphanFilesClean.olderThanMillis(olderThan),
-
OrphanFilesClean.createFileCleaner(catalog, dryRun),
parallelism,
dryRun);
break;
@@ -131,7 +130,6 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
identifier.getDatabaseName(),
identifier.getTableName(),
OrphanFilesClean.olderThanMillis(olderThan),
-
OrphanFilesClean.createFileCleaner(catalog, dryRun),
parallelism,
dryRun);
break;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
index 328a11c017..010a3e4ede 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
@@ -44,11 +44,10 @@ import scala.collection.mutable.ArrayBuffer
case class SparkOrphanFilesClean(
specifiedTable: FileStoreTable,
specifiedOlderThanMillis: Long,
- specifiedFileCleaner: SerializableConsumer[Path],
parallelism: Int,
- dryRun: Boolean,
+ dryRunPara: Boolean,
@transient spark: SparkSession)
- extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis,
specifiedFileCleaner)
+ extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis,
dryRunPara)
with SQLConfHelper
with Logging {
@@ -150,7 +149,7 @@ case class SparkOrphanFilesClean(
val pathToClean = fileInfo.getString(1)
val deletedPath = new Path(pathToClean)
deletedFilesLenInBytes += fileInfo.getLong(2)
- specifiedFileCleaner.accept(deletedPath)
+ cleanFile(deletedPath)
logInfo(s"Cleaned file: $pathToClean")
dataDirs.add(fileInfo.getString(3))
deletedFilesCount += 1
@@ -198,7 +197,6 @@ object SparkOrphanFilesClean extends SQLConfHelper {
databaseName: String,
tableName: String,
olderThanMillis: Long,
- fileCleaner: SerializableConsumer[Path],
parallelismOpt: Integer,
dryRun: Boolean): CleanOrphanFilesResult = {
val spark = SparkSession.active
@@ -230,7 +228,6 @@ object SparkOrphanFilesClean extends SQLConfHelper {
new SparkOrphanFilesClean(
table,
olderThanMillis,
- fileCleaner,
parallelism,
dryRun,
spark