This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ccaafc942 [core] support remove orphan files for database (#3671)
ccaafc942 is described below
commit ccaafc9429209b68bdbb1948a7cc6431b02ca207
Author: wangwj <[email protected]>
AuthorDate: Fri Jul 5 21:24:28 2024 +0800
[core] support remove orphan files for database (#3671)
---
.../apache/paimon/operation/OrphanFilesClean.java | 40 ++++++
.../flink/action/ExpirePartitionsAction.java | 11 +-
.../flink/action/RemoveOrphanFilesAction.java | 91 ++++++++++----
.../action/RemoveOrphanFilesActionFactory.java | 25 ++--
.../procedure/RemoveOrphanFilesProcedure.java | 27 +++--
.../action/RemoveOrphanFilesActionITCase.java | 82 ++++++++++++-
.../procedure/RemoveOrphanFilesProcedure.java | 135 ++++++++++++++++-----
.../procedure/RemoveOrphanFilesProcedureTest.scala | 70 ++++++++++-
8 files changed, 390 insertions(+), 91 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index bccfee93e..d725f3360 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -21,6 +21,8 @@ package org.apache.paimon.operation;
import org.apache.paimon.Changelog;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
@@ -32,8 +34,10 @@ import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileUtils;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
@@ -60,6 +64,7 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
* To remove the data files and metadata files that are not used by table
(so-called "orphan
@@ -82,6 +87,7 @@ public class OrphanFilesClean {
private static final int READ_FILE_RETRY_NUM = 3;
private static final int READ_FILE_RETRY_INTERVAL = 5;
+ public static final int SHOW_LIMIT = 200;
private final SnapshotManager snapshotManager;
private final TagManager tagManager;
@@ -557,4 +563,38 @@ public class OrphanFilesClean {
}
return result;
}
+
+ public static List<Pair<String, OrphanFilesClean>>
constructOrphanFilesCleans(
+ Catalog catalog, String databaseName, @Nullable String tableName)
+ throws Catalog.DatabaseNotExistException,
Catalog.TableNotExistException {
+ List<Pair<String, OrphanFilesClean>> orphanFilesCleans = new
ArrayList<>();
+ List<String> tableNames = Collections.singletonList(tableName);
+ if (tableName == null || "*".equals(tableName)) {
+ tableNames = catalog.listTables(databaseName);
+ }
+
+ for (String t : tableNames) {
+ Identifier identifier = new Identifier(databaseName, t);
+ Table table = catalog.getTable(identifier);
+ checkArgument(
+ table instanceof FileStoreTable,
+ "Only FileStoreTable supports remove-orphan-files action.
The table type is '%s'.",
+ table.getClass().getName());
+
+ orphanFilesCleans.add(Pair.of(t, new
OrphanFilesClean((FileStoreTable) table)));
+ }
+
+ return orphanFilesCleans;
+ }
+
+ public static void initOlderThan(
+ String olderThan, List<Pair<String, OrphanFilesClean>>
tableOrphanFilesCleans) {
+ tableOrphanFilesCleans.forEach(
+ orphanFilesClean ->
orphanFilesClean.getRight().olderThan(olderThan));
+ }
+
+ public static void initDryRun(List<Pair<String, OrphanFilesClean>>
tableOrphanFilesCleans) {
+ tableOrphanFilesCleans.forEach(
+ orphanFilesClean ->
orphanFilesClean.getRight().fileCleaner(path -> {}));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index 7d1c4dd5e..fda348be8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -28,13 +28,10 @@ import java.time.Duration;
import java.util.Map;
import java.util.Optional;
-import static org.apache.paimon.catalog.CatalogUtils.table;
-
/** Expire partitions action for Flink. */
public class ExpirePartitionsAction extends TableActionBase {
- private final String expirationTime;
- private final String timestampFormatter;
- private PartitionExpire partitionExpire;
+
+ private final PartitionExpire partitionExpire;
public ExpirePartitionsAction(
String warehouse,
@@ -50,11 +47,9 @@ public class ExpirePartitionsAction extends TableActionBase {
"Only FileStoreTable supports expire_partitions
action. The table type is '%s'.",
table.getClass().getName()));
}
- this.expirationTime = expirationTime;
- this.timestampFormatter = timestampFormatter;
FileStoreTable fileStoreTable = (FileStoreTable) table;
- FileStore fileStore = fileStoreTable.store();
+ FileStore<?> fileStore = fileStoreTable.store();
this.partitionExpire =
new PartitionExpire(
fileStore.partitionType(),
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
index 035882603..df0101deb 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesAction.java
@@ -18,51 +18,92 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.OrphanFilesClean;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.operation.OrphanFilesClean.SHOW_LIMIT;
/** Action to remove the orphan data files and metadata files. */
-public class RemoveOrphanFilesAction extends TableActionBase {
- private static final Logger LOG =
LoggerFactory.getLogger(RemoveOrphanFilesAction.class);
+public class RemoveOrphanFilesAction extends ActionBase {
- private final OrphanFilesClean orphanFilesClean;
+ private final List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
public RemoveOrphanFilesAction(
String warehouse,
String databaseName,
- String tableName,
- Map<String, String> catalogConfig) {
- super(warehouse, databaseName, tableName, catalogConfig);
-
- checkArgument(
- table instanceof FileStoreTable,
- "Only FileStoreTable supports remove-orphan-files action. The
table type is '%s'.",
- table.getClass().getName());
- this.orphanFilesClean = new OrphanFilesClean((FileStoreTable) table);
+ @Nullable String tableName,
+ Map<String, String> catalogConfig)
+ throws Catalog.TableNotExistException,
Catalog.DatabaseNotExistException {
+ super(warehouse, catalogConfig);
+ this.tableOrphanFilesCleans =
+ OrphanFilesClean.constructOrphanFilesCleans(catalog,
databaseName, tableName);
}
- public RemoveOrphanFilesAction olderThan(String timestamp) {
- this.orphanFilesClean.olderThan(timestamp);
- return this;
+ public void olderThan(String olderThan) {
+ OrphanFilesClean.initOlderThan(olderThan, this.tableOrphanFilesCleans);
}
- public RemoveOrphanFilesAction dryRun() {
- this.orphanFilesClean.fileCleaner(path -> {});
- return this;
+ public void dryRun() {
+ OrphanFilesClean.initDryRun(this.tableOrphanFilesCleans);
+ }
+
+ public static String[] executeOrphanFilesClean(
+ List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans)
+ throws ExecutionException, InterruptedException {
+ int availableProcessors = Runtime.getRuntime().availableProcessors();
+ ExecutorService executePool =
+ new ThreadPoolExecutor(
+ availableProcessors,
+ availableProcessors,
+ 1,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ExecutorThreadFactory(
+ Thread.currentThread().getName() +
"-RemoveOrphanFiles"));
+
+ List<Future<List<Path>>> tasks = new ArrayList<>();
+ for (Pair<String, OrphanFilesClean> tableOrphanFilesClean :
tableOrphanFilesCleans) {
+ OrphanFilesClean orphanFilesClean =
tableOrphanFilesClean.getRight();
+ Future<List<Path>> task =
+ executePool.submit(
+ () -> {
+ try {
+ return orphanFilesClean.clean();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ tasks.add(task);
+ }
+
+ List<Path> cleanOrphanFiles = new ArrayList<>();
+ for (Future<List<Path>> task : tasks) {
+ cleanOrphanFiles.addAll(task.get());
+ }
+
+ executePool.shutdownNow();
+
+ return OrphanFilesClean.showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT)
+ .toArray(new String[0]);
}
@Override
public void run() throws Exception {
- List<String> result =
OrphanFilesClean.showDeletedFiles(orphanFilesClean.clean(), 200);
- String files = String.join(", ", result);
- LOG.info("orphan files: [{}]", files);
+ executeOrphanFilesClean(tableOrphanFilesCleans);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
index 0a461d839..769453d4a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionFactory.java
@@ -18,16 +18,15 @@
package org.apache.paimon.flink.action;
-import org.apache.flink.api.java.tuple.Tuple3;
-
import java.util.Map;
import java.util.Optional;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
/** Factory to create {@link RemoveOrphanFilesAction}. */
public class RemoveOrphanFilesActionFactory implements ActionFactory {
public static final String IDENTIFIER = "remove_orphan_files";
-
private static final String OLDER_THAN = "older_than";
private static final String DRY_RUN = "dry_run";
@@ -38,12 +37,20 @@ public class RemoveOrphanFilesActionFactory implements
ActionFactory {
@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
- Tuple3<String, String, String> tablePath = getTablePath(params);
+ String warehouse = params.get(WAREHOUSE);
+ checkNotNull(warehouse);
+ String database = params.get(DATABASE);
+ checkNotNull(database);
+ String table = params.get(TABLE);
+
Map<String, String> catalogConfig = optionalConfigMap(params,
CATALOG_CONF);
- RemoveOrphanFilesAction action =
- new RemoveOrphanFilesAction(
- tablePath.f0, tablePath.f1, tablePath.f2,
catalogConfig);
+ RemoveOrphanFilesAction action;
+ try {
+ action = new RemoveOrphanFilesAction(warehouse, database, table,
catalogConfig);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
if (params.has(OLDER_THAN)) {
action.olderThan(params.get(OLDER_THAN));
@@ -76,5 +83,9 @@ public class RemoveOrphanFilesActionFactory implements
ActionFactory {
System.out.println(
"When '--dry_run true', view only orphan files, don't actually
remove files. Default is false.");
System.out.println();
+
+ System.out.println(
+ "If the table is null or *, all orphan files in all tables
under the db will be cleaned up.");
+ System.out.println();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index 26b637a35..5c7c75850 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,13 +20,14 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.operation.OrphanFilesClean;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.StringUtils;
import org.apache.flink.table.procedure.ProcedureContext;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
+import java.util.List;
+
+import static
org.apache.paimon.flink.action.RemoveOrphanFilesAction.executeOrphanFilesClean;
/**
* Remove orphan files procedure. Usage:
@@ -37,6 +38,9 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
*
* -- use custom file delete interval
* CALL sys.remove_orphan_files('tableId', '2023-12-31 23:59:59')
+ *
+ * -- remove all tables' orphan files in db
+ * CALL sys.remove_orphan_files('databaseName.*', '2023-12-31 23:59:59')
* </code></pre>
*/
public class RemoveOrphanFilesProcedure extends ProcedureBase {
@@ -56,24 +60,21 @@ public class RemoveOrphanFilesProcedure extends
ProcedureBase {
ProcedureContext procedureContext, String tableId, String
olderThan, boolean dryRun)
throws Exception {
Identifier identifier = Identifier.fromString(tableId);
- Table table = catalog.getTable(identifier);
+ String databaseName = identifier.getDatabaseName();
+ String tableName = identifier.getObjectName();
- checkArgument(
- table instanceof FileStoreTable,
- "Only FileStoreTable supports remove-orphan-files action. The
table type is '%s'.",
- table.getClass().getName());
+ List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans =
+ OrphanFilesClean.constructOrphanFilesCleans(catalog,
databaseName, tableName);
- OrphanFilesClean orphanFilesClean = new
OrphanFilesClean((FileStoreTable) table);
if (!StringUtils.isBlank(olderThan)) {
- orphanFilesClean.olderThan(olderThan);
+ OrphanFilesClean.initOlderThan(olderThan, tableOrphanFilesCleans);
}
if (dryRun) {
- orphanFilesClean.fileCleaner(path -> {});
+ OrphanFilesClean.initDryRun(tableOrphanFilesCleans);
}
- return OrphanFilesClean.showDeletedFiles(orphanFilesClean.clean(), 200)
- .toArray(new String[0]);
+ return executeOrphanFilesClean(tableOrphanFilesCleans);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
index 0c335c726..9e7b6a7c3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
@@ -44,8 +44,10 @@ import static org.assertj.core.api.Assertions.assertThatCode;
/** IT cases for {@link RemoveOrphanFilesAction}. */
public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
- @Test
- public void testRunWithoutException() throws Exception {
+ private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1";
+ private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2";
+
+ private FileStoreTable createTableAndWriteData(String tableName) throws
Exception {
RowType rowType =
RowType.of(
new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
@@ -53,6 +55,7 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
FileStoreTable table =
createFileStoreTable(
+ tableName,
rowType,
Collections.emptyList(),
Collections.singletonList("k"),
@@ -65,14 +68,27 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
writeData(rowData(1L, BinaryString.fromString("Hi")));
- Path orphanFile1 = new Path(table.location(), "bucket-0/orphan_file1");
- Path orphanFile2 = new Path(table.location(), "bucket-0/orphan_file2");
+ Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1);
+ Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2);
FileIO fileIO = table.fileIO();
fileIO.writeFile(orphanFile1, "a", true);
Thread.sleep(2000);
fileIO.writeFile(orphanFile2, "b", true);
+ return table;
+ }
+
+ private Path getOrphanFilePath(FileStoreTable table, String orphanFile) {
+ return new Path(table.location(), orphanFile);
+ }
+
+ @Test
+ public void testRunWithoutException() throws Exception {
+ FileStoreTable table = createTableAndWriteData(tableName);
+ Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1);
+ Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2);
+
List<String> args =
new ArrayList<>(
Arrays.asList(
@@ -117,4 +133,62 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
Row.of(orphanFile1.toUri().getPath()),
Row.of(orphanFile2.toUri().getPath()));
}
+
+ @Test
+ public void testRemoveDatabaseOrphanFilesITCase() throws Exception {
+ FileStoreTable table1 = createTableAndWriteData("tableName1");
+ Path orphanFile11 = getOrphanFilePath(table1, ORPHAN_FILE_1);
+ Path orphanFile12 = getOrphanFilePath(table1, ORPHAN_FILE_2);
+ FileStoreTable table2 = createTableAndWriteData("tableName2");
+ Path orphanFile21 = getOrphanFilePath(table2, ORPHAN_FILE_1);
+ Path orphanFile22 = getOrphanFilePath(table2, ORPHAN_FILE_2);
+
+ List<String> args =
+ new ArrayList<>(
+ Arrays.asList(
+ "remove_orphan_files",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ "*"));
+ RemoveOrphanFilesAction action1 =
createAction(RemoveOrphanFilesAction.class, args);
+ assertThatCode(action1::run).doesNotThrowAnyException();
+
+ args.add("--older_than");
+ args.add("2023-12-31 23:59:59");
+ RemoveOrphanFilesAction action2 =
createAction(RemoveOrphanFilesAction.class, args);
+ assertThatCode(action2::run).doesNotThrowAnyException();
+
+ String withoutOlderThan =
+ String.format("CALL sys.remove_orphan_files('%s.%s')",
database, "*");
+ CloseableIterator<Row> withoutOlderThanCollect =
callProcedure(withoutOlderThan);
+
assertThat(ImmutableList.copyOf(withoutOlderThanCollect).size()).isEqualTo(0);
+
+ String withDryRun =
+ String.format(
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59', true)",
+ database, "*");
+ ImmutableList<Row> actualDryRunDeleteFile =
ImmutableList.copyOf(callProcedure(withDryRun));
+ assertThat(actualDryRunDeleteFile)
+ .containsExactlyInAnyOrder(
+ Row.of(orphanFile11.toUri().getPath()),
+ Row.of(orphanFile12.toUri().getPath()),
+ Row.of(orphanFile21.toUri().getPath()),
+ Row.of(orphanFile22.toUri().getPath()));
+
+ String withOlderThan =
+ String.format(
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59')",
+ database, "*");
+ ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(callProcedure(withOlderThan));
+
+ assertThat(actualDeleteFile)
+ .containsExactlyInAnyOrder(
+ Row.of(orphanFile11.toUri().getPath()),
+ Row.of(orphanFile12.toUri().getPath()),
+ Row.of(orphanFile21.toUri().getPath()),
+ Row.of(orphanFile22.toUri().getPath()));
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index b88b2d169..6c1ad2405 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -18,20 +18,33 @@
package org.apache.paimon.spark.procedure;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.OrphanFilesClean;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.ExecutorThreadFactory;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
import org.apache.spark.sql.catalyst.InternalRow;
-import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.TableCatalog;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import static org.apache.paimon.operation.OrphanFilesClean.SHOW_LIMIT;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -41,10 +54,15 @@ import static
org.apache.spark.sql.types.DataTypes.StringType;
*
* <pre><code>
* CALL sys.remove_orphan_files(table => 'tableId', [older_than =>
'2023-10-31 12:00:00'])
+ *
+ * CALL sys.remove_orphan_files(table => 'databaseName.*', [older_than =>
'2023-10-31 12:00:00'])
* </code></pre>
*/
public class RemoveOrphanFilesProcedure extends BaseProcedure {
+ private static final Logger LOG =
+
LoggerFactory.getLogger(RemoveOrphanFilesProcedure.class.getName());
+
private static final ProcedureParameter[] PARAMETERS =
new ProcedureParameter[] {
ProcedureParameter.required("table", StringType),
@@ -74,36 +92,93 @@ public class RemoveOrphanFilesProcedure extends
BaseProcedure {
@Override
public InternalRow[] call(InternalRow args) {
- Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ org.apache.paimon.catalog.Identifier identifier;
+ String tableId = args.getString(0);
+ Preconditions.checkArgument(
+ tableId != null && !tableId.isEmpty(),
+ "Cannot handle an empty tableId for argument %s",
+ tableId);
+
+ if (tableId.endsWith(".*")) {
+ identifier =
org.apache.paimon.catalog.Identifier.fromString(tableId);
+ } else {
+ identifier =
+ org.apache.paimon.catalog.Identifier.fromString(
+ toIdentifier(args.getString(0),
PARAMETERS[0].name()).toString());
+ }
+ LOG.info("identifier is {}.", identifier);
+
+ List<Pair<String, OrphanFilesClean>> tableOrphanFilesCleans;
+ try {
+ tableOrphanFilesCleans =
+ OrphanFilesClean.constructOrphanFilesCleans(
+ ((WithPaimonCatalog)
tableCatalog()).paimonCatalog(),
+ identifier.getDatabaseName(),
+ identifier.getObjectName());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
String olderThan = args.isNullAt(1) ? null : args.getString(1);
- boolean dryRun = args.isNullAt(2) ? false : args.getBoolean(2);
-
- return modifyPaimonTable(
- tableIdent,
- table -> {
- checkArgument(table instanceof FileStoreTable);
- OrphanFilesClean orphanFilesClean =
- new OrphanFilesClean((FileStoreTable) table);
- if (!StringUtils.isBlank(olderThan)) {
- orphanFilesClean.olderThan(olderThan);
- }
- if (dryRun) {
- orphanFilesClean.fileCleaner(path -> {});
- }
- try {
- List<String> result =
-
OrphanFilesClean.showDeletedFiles(orphanFilesClean.clean(), 200);
- InternalRow[] rows = new InternalRow[result.size()];
- int index = 0;
- for (String line : result) {
- rows[index] =
newInternalRow(UTF8String.fromString(line));
- index++;
- }
- return rows;
- } catch (Exception e) {
- throw new RuntimeException("Call remove_orphan_files
error", e);
- }
- });
+ if (!StringUtils.isBlank(olderThan)) {
+ OrphanFilesClean.initOlderThan(olderThan, tableOrphanFilesCleans);
+ }
+
+ boolean dryRun = !args.isNullAt(2) && args.getBoolean(2);
+ if (dryRun) {
+ OrphanFilesClean.initDryRun(tableOrphanFilesCleans);
+ }
+
+ int availableProcessors = Runtime.getRuntime().availableProcessors();
+ ExecutorService executePool =
+ new ThreadPoolExecutor(
+ availableProcessors,
+ availableProcessors,
+ 1,
+ TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ExecutorThreadFactory(
+ Thread.currentThread().getName() +
"-RemoveOrphanFiles"));
+ List<Future<List<Path>>> tasks = new ArrayList<>();
+ for (Pair<String, OrphanFilesClean> tableOrphanFilesClean :
tableOrphanFilesCleans) {
+ String tableName = tableOrphanFilesClean.getLeft();
+ OrphanFilesClean orphanFilesClean =
tableOrphanFilesClean.getRight();
+ Future<List<Path>> task =
+ executePool.submit(
+ () ->
+ modifyPaimonTable(
+ toIdentifier(tableName, tableName),
+ table -> {
+ checkArgument(table instanceof
FileStoreTable);
+ try {
+ return
orphanFilesClean.clean();
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Call
remove_orphan_files error", e);
+ }
+ }));
+ tasks.add(task);
+ }
+
+ List<Path> cleanOrphanFiles = new ArrayList<>();
+ for (Future<List<Path>> task : tasks) {
+ try {
+ cleanOrphanFiles.addAll(task.get());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ executePool.shutdownNow();
+
+ List<InternalRow> showLimitedDeletedFiles = new
ArrayList<>(cleanOrphanFiles.size());
+ OrphanFilesClean.showDeletedFiles(cleanOrphanFiles, SHOW_LIMIT)
+ .forEach(
+ deletedFile ->
+ showLimitedDeletedFiles.add(
+
newInternalRow(UTF8String.fromString(deletedFile))));
+
+ return showLimitedDeletedFiles.toArray(new InternalRow[0]);
}
public static ProcedureBuilder builder() {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index c0bf84c4d..23a014d0f 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -28,6 +28,9 @@ import java.util.concurrent.TimeUnit
class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {
+ private val ORPHAN_FILE_1 = "bucket-0/orphan_file1"
+ private val ORPHAN_FILE_2 = "bucket-0/orphan_file2"
+
test("Paimon procedure: remove orphan files") {
spark.sql(s"""
|CREATE TABLE T (id STRING, name STRING)
@@ -41,8 +44,8 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
val fileIO = table.fileIO()
val tablePath = table.location()
- val orphanFile1 = new Path(tablePath, "bucket-0/orphan_file1")
- val orphanFile2 = new Path(tablePath, "bucket-0/orphan_file2")
+ val orphanFile1 = new Path(tablePath, ORPHAN_FILE_1)
+ val orphanFile2 = new Path(tablePath, ORPHAN_FILE_2)
fileIO.tryToWriteAtomic(orphanFile1, "a")
Thread.sleep(2000)
@@ -84,8 +87,8 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
val fileIO = table.fileIO()
val tablePath = table.location()
- val orphanFile1 = new Path(tablePath, "bucket-0/orphan_file1")
- val orphanFile2 = new Path(tablePath, "bucket-0/orphan_file2")
+ val orphanFile1 = new Path(tablePath, ORPHAN_FILE_1)
+ val orphanFile2 = new Path(tablePath, ORPHAN_FILE_2)
fileIO.writeFile(orphanFile1, "a", true)
Thread.sleep(2000)
@@ -104,4 +107,63 @@ class RemoveOrphanFilesProcedureTest extends
PaimonSparkTestBase {
Row(orphanFile1.toUri.getPath) :: Row(orphanFile2.toUri.getPath) :: Nil
)
}
+
+ test("Paimon procedure: remove database orphan files") {
+ spark.sql(s"""
+ |CREATE TABLE T1 (id STRING, name STRING)
+ |USING PAIMON
+ |TBLPROPERTIES ('primary-key'='id')
+ |""".stripMargin)
+ spark.sql(s"INSERT INTO T1 VALUES ('1', 'a'), ('2', 'b')")
+
+ spark.sql(s"""
+ |CREATE TABLE T2 (id STRING, name STRING)
+ |USING PAIMON
+ |TBLPROPERTIES ('primary-key'='id')
+ |""".stripMargin)
+ spark.sql(s"INSERT INTO T2 VALUES ('1', 'a'), ('2', 'b')")
+
+ val table1 = loadTable("T1")
+ val table2 = loadTable("T2")
+ val fileIO1 = table1.fileIO()
+ val fileIO2 = table2.fileIO()
+ val tablePath1 = table1.location()
+ val tablePath2 = table2.location()
+
+ val orphanFile11 = new Path(tablePath1, ORPHAN_FILE_1)
+ val orphanFile12 = new Path(tablePath1, ORPHAN_FILE_2)
+ val orphanFile21 = new Path(tablePath2, ORPHAN_FILE_1)
+ val orphanFile22 = new Path(tablePath2, ORPHAN_FILE_2)
+
+ fileIO1.tryToWriteAtomic(orphanFile11, "a")
+ fileIO2.tryToWriteAtomic(orphanFile21, "a")
+ Thread.sleep(2000)
+ fileIO1.tryToWriteAtomic(orphanFile12, "b")
+ fileIO2.tryToWriteAtomic(orphanFile22, "b")
+
+ // by default, no file deleted
+ checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"),
Nil)
+
+ val orphanFile12ModTime =
fileIO1.getFileStatus(orphanFile12).getModificationTime
+ val older_than1 = DateTimeUtils.formatLocalDateTime(
+ DateTimeUtils.toLocalDateTime(
+ orphanFile12ModTime -
+ TimeUnit.SECONDS.toMillis(1)),
+ 3)
+
+ checkAnswer(
+ spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than
=> '$older_than1')"),
+ Row(orphanFile11.toUri.getPath) :: Row(orphanFile21.toUri.getPath) :: Nil
+ )
+
+ val older_than2 = DateTimeUtils.formatLocalDateTime(
+ DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
+ 3)
+
+ checkAnswer(
+ spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than
=> '$older_than2')"),
+ Row(orphanFile12.toUri.getPath) :: Row(orphanFile22.toUri.getPath) :: Nil
+ )
+ }
+
}