This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4362b79429 [core] Purge files use Table API and no dry run (#5448)
4362b79429 is described below
commit 4362b79429142b0777d162a9d54d4fe1300f4a6c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 11 14:57:43 2025 +0800
[core] Purge files use Table API and no dry run (#5448)
---
docs/content/flink/procedures.md | 18 +-------
docs/content/spark/procedures.md | 4 +-
.../paimon/table/AbstractFileStoreTable.java | 5 +++
.../java/org/apache/paimon/table/DataTable.java | 3 ++
.../paimon/table/DelegatedFileStoreTable.java | 6 +++
.../org/apache/paimon/table/FileStoreTable.java | 47 ++++++++++++++++++++
.../apache/paimon/table/system/AuditLogTable.java | 5 +++
.../paimon/table/system/CompactBucketsTable.java | 6 +++
.../paimon/table/system/FileMonitorTable.java | 6 +++
.../paimon/table/system/ReadOptimizedTable.java | 6 +++
.../flink/procedure/PurgeFilesProcedure.java | 47 ++------------------
.../flink/procedure/PurgeFilesProcedure.java | 50 +++-------------------
.../flink/procedure/PurgeFilesProcedureITCase.java | 38 +---------------
.../spark/procedure/PurgeFilesProcedure.java | 49 +++------------------
.../spark/procedure/PurgeFilesProcedureTest.scala | 33 +-------------
15 files changed, 105 insertions(+), 218 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 26cf30c1df..ffacc3d717 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -486,29 +486,15 @@ All available procedures are listed below.
<tr>
<td>purge_files</td>
<td>
- -- for Flink 1.18<br/>
- -- clear table with purge files directly.<br/>
+ -- clear table with purge files.<br/>
CALL [catalog.]sys.purge_files('identifier')<br/>
- -- only check what dirs will be deleted, but not really delete
them.<br/>
- CALL [catalog.]sys.purge_files('identifier', true)<br/><br/>
- -- for Flink 1.19 and later<br/>
- -- clear table with purge files directly.<br/>
- CALL [catalog.]sys.purge_files(`table` => 'default.T')<br/>
- -- only check what dirs will be deleted, but not really delete
them.<br/>
- CALL [catalog.]sys.purge_files(`table` => 'default.T', `dry_run` =>
true)<br/><br/>
</td>
<td>
- To clear table with purge files directly. Argument:
+ To clear table with purge files. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
- <li>dry_run (optional): only check what dirs will be deleted, but
not really delete them. Default is false.</li>
</td>
<td>
- -- for Flink 1.18<br/>
CALL sys.purge_files('default.T')<br/>
- CALL sys.purge_files('default.T', true)<br/><br/>
- -- for Flink 1.19 and later<br/>
- CALL sys.purge_files(`table` => 'default.T')<br/>
- CALL sys.purge_files(`table` => 'default.T', `dry_run` => true)
</td>
</tr>
<tr>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 07afd315e3..be667e67f2 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -198,13 +198,11 @@ This section introduce all available spark procedures
about paimon.
<tr>
<td>purge_files</td>
<td>
- To clear table with purge files directly. Argument:
+ To clear table with purge files. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
- <li>dry_run (optional): only check what dirs will be deleted, but
not really delete them. Default is false.</li>
</td>
<td>
CALL sys.purge_files(table => 'default.T')<br/>
- CALL sys.purge_files(table => 'default.T', dry_run => true)
</td>
</tr>
<tr>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 9793a6a52a..f68bd9615e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -452,6 +452,11 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
options.forceCreatingSnapshot());
}
+ @Override
+ public ConsumerManager consumerManager() {
+ return new ConsumerManager(fileIO, path, snapshotManager().branch());
+ }
+
@Nullable
protected Runnable newExpireRunnable() {
CoreOptions options = coreOptions();
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
index dd0ab68254..dd39a1610b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/DataTable.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.source.DataTableScan;
@@ -42,6 +43,8 @@ public interface DataTable extends InnerTable {
ChangelogManager changelogManager();
+ ConsumerManager consumerManager();
+
SchemaManager schemaManager();
TagManager tagManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
index 0e0967fe89..dc4aeec073 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/DelegatedFileStoreTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -105,6 +106,11 @@ public abstract class DelegatedFileStoreTable implements
FileStoreTable {
return wrapped.schemaManager();
}
+ @Override
+ public ConsumerManager consumerManager() {
+ return wrapped.consumerManager();
+ }
+
@Override
public TagManager tagManager() {
return wrapped.tagManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 61aa77d5f3..92b0fc7b6a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -20,20 +20,28 @@ package org.apache.paimon.table;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestCacheFilter;
+import org.apache.paimon.operation.LocalOrphanFilesClean;
+import org.apache.paimon.options.ExpireConfig;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.RowKeyExtractor;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.ChangelogManager;
import org.apache.paimon.utils.SegmentsCache;
+import org.apache.paimon.utils.TagManager;
import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache;
+import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -120,4 +128,43 @@ public interface FileStoreTable extends DataTable {
*/
@Override
FileStoreTable switchToBranch(String branchName);
+
+ /** Purge all files in this table. */
+ default void purgeFiles() throws Exception {
+ // clear branches
+ BranchManager branchManager = branchManager();
+ branchManager.branches().forEach(branchManager::dropBranch);
+
+ // clear tags
+ TagManager tagManager = tagManager();
+ tagManager.allTagNames().forEach(this::deleteTag);
+
+ // clear consumers
+ ConsumerManager consumerManager = this.consumerManager();
+
consumerManager.consumers().keySet().forEach(consumerManager::deleteConsumer);
+
+ // truncate table
+ try (BatchTableCommit commit =
this.newBatchWriteBuilder().newCommit()) {
+ commit.truncateTable();
+ }
+
+ // clear changelogs
+ ChangelogManager changelogManager = this.changelogManager();
+ this.fileIO().delete(changelogManager.changelogDirectory(), true);
+
+ // clear snapshots, keep only latest snapshot
+ this.newExpireSnapshots()
+ .config(
+ ExpireConfig.builder()
+ .snapshotMaxDeletes(Integer.MAX_VALUE)
+ .snapshotRetainMax(1)
+ .snapshotRetainMin(1)
+ .snapshotTimeRetain(Duration.ZERO)
+ .build())
+ .expire();
+
+ // clear orphan files
+ LocalOrphanFilesClean clean = new LocalOrphanFilesClean(this,
System.currentTimeMillis());
+ clean.clean();
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 28b10bddd2..eabb1a6120 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -193,6 +193,11 @@ public class AuditLogTable implements DataTable,
ReadonlyTable {
return wrapped.changelogManager();
}
+ @Override
+ public ConsumerManager consumerManager() {
+ return wrapped.consumerManager();
+ }
+
@Override
public SchemaManager schemaManager() {
return wrapped.schemaManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
index de6b29dee4..8ffae30bd7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/CompactBucketsTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.system;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -152,6 +153,11 @@ public class CompactBucketsTable implements DataTable,
ReadonlyTable {
return wrapped.changelogManager();
}
+ @Override
+ public ConsumerManager consumerManager() {
+ return wrapped.consumerManager();
+ }
+
@Override
public SchemaManager schemaManager() {
return wrapped.schemaManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
index f3fafaa91e..060570406b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table.system;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.Experimental;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
@@ -138,6 +139,11 @@ public class FileMonitorTable implements DataTable,
ReadonlyTable {
return wrapped.changelogManager();
}
+ @Override
+ public ConsumerManager consumerManager() {
+ return wrapped.consumerManager();
+ }
+
@Override
public SchemaManager schemaManager() {
return wrapped.schemaManager();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
index 1e252fc088..639b711c29 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/ReadOptimizedTable.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.system;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -173,6 +174,11 @@ public class ReadOptimizedTable implements DataTable,
ReadonlyTable {
return wrapped.changelogManager();
}
+ @Override
+ public ConsumerManager consumerManager() {
+ return wrapped.consumerManager();
+ }
+
@Override
public SchemaManager schemaManager() {
return wrapped.schemaManager();
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
index 2901fd1f59..a968d441d6 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -18,58 +18,19 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
import org.apache.flink.table.procedure.ProcedureContext;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
/** A procedure to purge files for a table. */
public class PurgeFilesProcedure extends ProcedureBase {
- public static final String IDENTIFIER = "purge_files";
- public String[] call(ProcedureContext procedureContext, String tableId)
- throws Catalog.TableNotExistException {
- return call(procedureContext, tableId, false);
- }
+ public static final String IDENTIFIER = "purge_files";
- public String[] call(ProcedureContext procedureContext, String tableId,
boolean dryRun)
- throws Catalog.TableNotExistException {
- Table table = catalog.getTable(Identifier.fromString(tableId));
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- FileIO fileIO = fileStoreTable.fileIO();
- Path tablePath = fileStoreTable.snapshotManager().tablePath();
- ArrayList<String> deleteDir;
- try {
- FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
- deleteDir = new ArrayList<>(fileStatuses.length);
- Arrays.stream(fileStatuses)
- .filter(f -> !f.getPath().getName().contains("schema"))
- .forEach(
- fileStatus -> {
- try {
-
deleteDir.add(fileStatus.getPath().getName());
- if (!dryRun) {
- fileIO.delete(fileStatus.getPath(),
true);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return deleteDir.isEmpty()
- ? new String[] {"There are no dir to be deleted."}
- : deleteDir.toArray(new String[0]);
+ public String[] call(ProcedureContext procedureContext, String tableId)
throws Exception {
+ ((FileStoreTable)
catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
+ return new String[] {"Success"};
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
index 64529db3c8..db35fc6c9d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -18,13 +18,8 @@
package org.apache.paimon.flink.procedure;
-import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.Table;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
@@ -32,10 +27,6 @@ import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.types.Row;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
/**
* A procedure to purge files for a table. Usage:
*
@@ -48,42 +39,11 @@ public class PurgeFilesProcedure extends ProcedureBase {
public static final String IDENTIFIER = "purge_files";
- @ProcedureHint(
- argument = {
- @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
- @ArgumentHint(name = "dry_run", type =
@DataTypeHint("BOOLEAN"), isOptional = true)
- })
- public @DataTypeHint("ROW<purged_file_path STRING>") Row[] call(
- ProcedureContext procedureContext, String tableId, Boolean dryRun)
- throws Catalog.TableNotExistException {
- Table table = catalog.getTable(Identifier.fromString(tableId));
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- FileIO fileIO = fileStoreTable.fileIO();
- Path tablePath = fileStoreTable.snapshotManager().tablePath();
- ArrayList<String> deleteDir;
- try {
- FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
- deleteDir = new ArrayList<>(fileStatuses.length);
- Arrays.stream(fileStatuses)
- .filter(f -> !f.getPath().getName().contains("schema"))
- .forEach(
- fileStatus -> {
- try {
-
deleteDir.add(fileStatus.getPath().getName());
- if (dryRun == null || !dryRun) {
- fileIO.delete(fileStatus.getPath(),
true);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- return deleteDir.isEmpty()
- ? new Row[] {Row.of("There are no dir to be deleted.")}
- : deleteDir.stream().map(Row::of).toArray(Row[]::new);
+ @ProcedureHint(argument = {@ArgumentHint(name = "table", type =
@DataTypeHint("STRING"))})
+ public @DataTypeHint("ROW<result STRING>") Row[] call(
+ ProcedureContext procedureContext, String tableId) throws
Exception {
+ ((FileStoreTable)
catalog.getTable(Identifier.fromString(tableId))).purgeFiles();
+ return new Row[] {Row.of("Success")};
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
index 6cd4351783..23ad3c9507 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
@@ -29,7 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat;
public class PurgeFilesProcedureITCase extends CatalogITCaseBase {
@Test
- public void testPurgeFiles() throws Exception {
+ public void testPurgeFiles() {
sql(
"CREATE TABLE T (id INT, name STRING,"
+ " PRIMARY KEY (id) NOT ENFORCED)"
@@ -41,41 +41,7 @@ public class PurgeFilesProcedureITCase extends
CatalogITCaseBase {
sql("INSERT INTO T VALUES (1, 'a')");
sql("CALL sys.purge_files(`table` => 'default.T')");
assertThat(sql("select * from `T`")).containsExactly();
-
- sql("INSERT INTO T VALUES (2, 'a')");
- assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
- }
-
- @Test
- public void testPurgeFilesDryRun() {
- sql(
- "CREATE TABLE T (id INT, name STRING,"
- + " PRIMARY KEY (id) NOT ENFORCED)"
- + " WITH ('bucket'='1')");
- // There are no dir to delete.
- assertThat(
- sql("CALL sys.purge_files(`table` => 'default.T',
`dry_run` => true)")
- .stream()
- .map(row -> row.getField(0)))
- .containsExactlyInAnyOrder("There are no dir to be deleted.");
-
- sql("INSERT INTO T VALUES (1, 'a')");
- assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
-
- // dry run.
- assertThat(
- sql("CALL sys.purge_files(`table` => 'default.T',
`dry_run` => true)")
- .stream()
- .map(row -> row.getField(0)))
- .containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
-
- assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
-
- assertThat(
- sql("CALL sys.purge_files(`table` =>
'default.T')").stream()
- .map(row -> row.getField(0)))
- .containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
- assertThat(sql("select * from `T`")).containsExactly();
+ assertThat(sql("select snapshot_id from `T$snapshots`")).hasSize(1);
sql("INSERT INTO T VALUES (2, 'a')");
assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
index 3def44b5ae..75444cb0ad 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
@@ -18,9 +18,6 @@
package org.apache.paimon.spark.procedure;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.FileStatus;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.spark.sql.catalyst.InternalRow;
@@ -31,26 +28,18 @@ import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/** A procedure to purge files for a table. */
public class PurgeFilesProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {
- ProcedureParameter.required("table", StringType),
- ProcedureParameter.optional("dry_run", BooleanType)
- };
+ new ProcedureParameter[] {ProcedureParameter.required("table",
StringType)};
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
- new StructField("purged_file_path", StringType, true,
Metadata.empty())
+ new StructField("result", StringType, true,
Metadata.empty())
});
private PurgeFilesProcedure(TableCatalog tableCatalog) {
@@ -70,44 +59,16 @@ public class PurgeFilesProcedure extends BaseProcedure {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- boolean dryRun = !args.isNullAt(1) && args.getBoolean(1);
-
return modifyPaimonTable(
tableIdent,
table -> {
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- FileIO fileIO = fileStoreTable.fileIO();
- Path tablePath =
fileStoreTable.snapshotManager().tablePath();
- ArrayList<String> deleteDir;
try {
- FileStatus[] fileStatuses =
fileIO.listStatus(tablePath);
- deleteDir = new ArrayList<>(fileStatuses.length);
- Arrays.stream(fileStatuses)
- .filter(f ->
!f.getPath().getName().contains("schema"))
- .forEach(
- fileStatus -> {
- try {
-
deleteDir.add(fileStatus.getPath().getName());
- if (!dryRun) {
-
fileIO.delete(fileStatus.getPath(), true);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
- spark().catalog().refreshTable(table.fullName());
- } catch (IOException e) {
+ ((FileStoreTable) table).purgeFiles();
+ } catch (Exception e) {
throw new RuntimeException(e);
}
- return deleteDir.isEmpty()
- ? new InternalRow[] {
- newInternalRow(
- UTF8String.fromString("There are no
dir to be deleted."))
- }
- : deleteDir.stream()
- .map(x ->
newInternalRow(UTF8String.fromString(x)))
- .toArray(InternalRow[]::new);
+ return new InternalRow[]
{newInternalRow(UTF8String.fromString("Success"))};
});
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
index 02fa60f1e0..9bf81b56ef 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.procedure
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.spark.sql.Row
+import org.assertj.core.api.Assertions.assertThat
class PurgeFilesProcedureTest extends PaimonSparkTestBase {
@@ -35,40 +36,10 @@ class PurgeFilesProcedureTest extends PaimonSparkTestBase {
spark.sql("CALL paimon.sys.purge_files(table => 'test.T')")
checkAnswer(spark.sql("select * from test.T"), Nil)
+ assertThat(spark.sql("select snapshot_id from
test.`T$snapshots`").collect()).hasSize(1)
spark.sql("insert into T select '2', 'aa'");
checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
}
- test("Paimon procedure: purge files test with dry run.") {
- spark.sql(s"""
- |CREATE TABLE T (id STRING, name STRING)
- |USING PAIMON
- |""".stripMargin)
-
- // There are no dir to be deleted.
- checkAnswer(
- spark.sql("CALL paimon.sys.purge_files(table => 'test.T')"),
- Row("There are no dir to be deleted.") :: Nil
- )
-
- spark.sql("insert into T select '1', 'aa'");
- checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
-
- // dry run.
- checkAnswer(
- spark.sql("CALL paimon.sys.purge_files(table => 'test.T', dry_run =>
true)"),
- Row("snapshot") :: Row("bucket-0") :: Row("manifest") :: Nil
- )
- checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
-
- // Do delete.
- spark.sql("CALL paimon.sys.purge_files(table => 'test.T')")
- checkAnswer(spark.sql("select * from test.T"), Nil)
-
- // insert new data.
- spark.sql("insert into T select '2', 'aa'");
- checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
- }
-
}