This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new be461dcf4a [core] Add dry_run parameters to purge_files Procedure and
display the list of deleted directories (#5342)
be461dcf4a is described below
commit be461dcf4a4976d9a90c4e82ba211dbc62c1ba96
Author: HunterXHunter <[email protected]>
AuthorDate: Thu Mar 27 17:48:40 2025 +0800
[core] Add dry_run parameters to purge_files Procedure and display the list
of deleted directories (#5342)
---
docs/content/flink/procedures.md | 15 ++++++---
docs/content/spark/procedures.md | 4 ++-
.../flink/procedure/PurgeFilesProcedure.java | 23 +++++++++++---
.../flink/procedure/PurgeFilesProcedure.java | 29 ++++++++++++-----
.../flink/procedure/PurgeFilesProcedureITCase.java | 35 +++++++++++++++++++++
.../spark/procedure/PurgeFilesProcedure.java | 36 +++++++++++++++-------
.../spark/procedure/PurgeFilesProcedureTest.scala | 31 +++++++++++++++++++
7 files changed, 145 insertions(+), 28 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 49507721a9..e3013c1384 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -488,20 +488,27 @@ All available procedures are listed below.
<td>
-- for Flink 1.18<br/>
-- clear table with purge files directly.<br/>
- CALL [catalog.]sys.purge_files('identifier')<br/><br/>
+ CALL [catalog.]sys.purge_files('identifier')<br/>
+ -- only check what dirs will be deleted, but not really delete
them.<br/>
+ CALL [catalog.]sys.purge_files('identifier', true)<br/><br/>
-- for Flink 1.19 and later<br/>
-- clear table with purge files directly.<br/>
- CALL [catalog.]sys.purge_files(`table` => 'default.T')<br/><br/>
+ CALL [catalog.]sys.purge_files(`table` => 'default.T')<br/>
+ -- only check what dirs will be deleted, but not really delete
them.<br/>
+ CALL [catalog.]sys.purge_files(`table` => 'default.T', `dry_run` =>
true)<br/><br/>
</td>
<td>
To clear table with purge files directly. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
+ <li>dry_run (optional): only check what dirs will be deleted, but
not really delete them. Default is false.</li>
</td>
<td>
-- for Flink 1.18<br/>
- CALL sys.purge_files('default.T')<br/><br/>
+ CALL sys.purge_files('default.T')<br/>
+ CALL sys.purge_files('default.T', true)<br/><br/>
-- for Flink 1.19 and later<br/>
- CALL sys.purge_files(`table` => 'default.T')
+ CALL sys.purge_files(`table` => 'default.T')<br/>
+ CALL sys.purge_files(`table` => 'default.T', `dry_run` => true)
</td>
</tr>
<tr>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index fb46b6266c..21d40e610d 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -200,9 +200,11 @@ This section introduce all available spark procedures
about paimon.
<td>
To clear table with purge files directly. Argument:
<li>table: the target table identifier. Cannot be empty.</li>
+ <li>dry_run (optional): only check what dirs will be deleted, but
not really delete them. Default is false.</li>
</td>
<td>
- CALL sys.purge_files(table => 'default.T')<br/><br/>
+ CALL sys.purge_files(table => 'default.T')<br/>
+ CALL sys.purge_files(table => 'default.T', dry_run => true)
</td>
</tr>
<tr>
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
index 3053eae3c7..2901fd1f59 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -28,6 +29,7 @@ import org.apache.paimon.table.Table;
import org.apache.flink.table.procedure.ProcedureContext;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
/** A procedure to purge files for a table. */
@@ -36,17 +38,28 @@ public class PurgeFilesProcedure extends ProcedureBase {
public String[] call(ProcedureContext procedureContext, String tableId)
throws Catalog.TableNotExistException {
+ return call(procedureContext, tableId, false);
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
boolean dryRun)
+ throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileIO fileIO = fileStoreTable.fileIO();
Path tablePath = fileStoreTable.snapshotManager().tablePath();
+ ArrayList<String> deleteDir;
try {
- Arrays.stream(fileIO.listStatus(tablePath))
+ FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
+ deleteDir = new ArrayList<>(fileStatuses.length);
+ Arrays.stream(fileStatuses)
.filter(f -> !f.getPath().getName().contains("schema"))
.forEach(
fileStatus -> {
try {
- fileIO.delete(fileStatus.getPath(), true);
+
deleteDir.add(fileStatus.getPath().getName());
+ if (!dryRun) {
+ fileIO.delete(fileStatus.getPath(),
true);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -54,9 +67,9 @@ public class PurgeFilesProcedure extends ProcedureBase {
} catch (IOException e) {
throw new RuntimeException(e);
}
- return new String[] {
- String.format("Success purge files with table: %s.",
fileStoreTable.name())
- };
+ return deleteDir.isEmpty()
+ ? new String[] {"There are no dir to be deleted."}
+ : deleteDir.toArray(new String[0]);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
index 7ee2a36104..64529db3c8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -29,8 +30,10 @@ import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
/**
@@ -45,20 +48,31 @@ public class PurgeFilesProcedure extends ProcedureBase {
public static final String IDENTIFIER = "purge_files";
- @ProcedureHint(argument = {@ArgumentHint(name = "table", type =
@DataTypeHint("STRING"))})
- public String[] call(ProcedureContext procedureContext, String tableId)
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "dry_run", type =
@DataTypeHint("BOOLEAN"), isOptional = true)
+ })
+ public @DataTypeHint("ROW<purged_file_path STRING>") Row[] call(
+ ProcedureContext procedureContext, String tableId, Boolean dryRun)
throws Catalog.TableNotExistException {
Table table = catalog.getTable(Identifier.fromString(tableId));
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileIO fileIO = fileStoreTable.fileIO();
Path tablePath = fileStoreTable.snapshotManager().tablePath();
+ ArrayList<String> deleteDir;
try {
- Arrays.stream(fileIO.listStatus(tablePath))
+ FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
+ deleteDir = new ArrayList<>(fileStatuses.length);
+ Arrays.stream(fileStatuses)
.filter(f -> !f.getPath().getName().contains("schema"))
.forEach(
fileStatus -> {
try {
- fileIO.delete(fileStatus.getPath(), true);
+
deleteDir.add(fileStatus.getPath().getName());
+ if (dryRun == null || !dryRun) {
+ fileIO.delete(fileStatus.getPath(),
true);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -66,9 +80,10 @@ public class PurgeFilesProcedure extends ProcedureBase {
} catch (IOException e) {
throw new RuntimeException(e);
}
- return new String[] {
- String.format("Success purge files with table: %s.",
fileStoreTable.name())
- };
+
+ return deleteDir.isEmpty()
+ ? new Row[] {Row.of("There are no dir to be deleted.")}
+ : deleteDir.stream().map(Row::of).toArray(Row[]::new);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
index 9eb9aad296..6cd4351783 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
@@ -45,4 +45,39 @@ public class PurgeFilesProcedureITCase extends
CatalogITCaseBase {
sql("INSERT INTO T VALUES (2, 'a')");
assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
}
+
+ @Test
+ public void testPurgeFilesDryRun() {
+ sql(
+ "CREATE TABLE T (id INT, name STRING,"
+ + " PRIMARY KEY (id) NOT ENFORCED)"
+ + " WITH ('bucket'='1')");
+ // There are no dir to delete.
+ assertThat(
+ sql("CALL sys.purge_files(`table` => 'default.T',
`dry_run` => true)")
+ .stream()
+ .map(row -> row.getField(0)))
+ .containsExactlyInAnyOrder("There are no dir to be deleted.");
+
+ sql("INSERT INTO T VALUES (1, 'a')");
+ assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
+
+ // dry run.
+ assertThat(
+ sql("CALL sys.purge_files(`table` => 'default.T',
`dry_run` => true)")
+ .stream()
+ .map(row -> row.getField(0)))
+ .containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
+
+ assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
+
+ assertThat(
+ sql("CALL sys.purge_files(`table` =>
'default.T')").stream()
+ .map(row -> row.getField(0)))
+ .containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
+ assertThat(sql("select * from `T`")).containsExactly();
+
+ sql("INSERT INTO T VALUES (2, 'a')");
+ assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
index 9db724294a..3def44b5ae 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.procedure;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.table.FileStoreTable;
@@ -31,20 +32,25 @@ import org.apache.spark.sql.types.StructType;
import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
import static org.apache.spark.sql.types.DataTypes.StringType;
/** A procedure to purge files for a table. */
public class PurgeFilesProcedure extends BaseProcedure {
private static final ProcedureParameter[] PARAMETERS =
- new ProcedureParameter[] {ProcedureParameter.required("table",
StringType)};
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.optional("dry_run", BooleanType)
+ };
private static final StructType OUTPUT_TYPE =
new StructType(
new StructField[] {
- new StructField("result", StringType, true,
Metadata.empty())
+ new StructField("purged_file_path", StringType, true,
Metadata.empty())
});
private PurgeFilesProcedure(TableCatalog tableCatalog) {
@@ -64,6 +70,7 @@ public class PurgeFilesProcedure extends BaseProcedure {
@Override
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
+ boolean dryRun = !args.isNullAt(1) && args.getBoolean(1);
return modifyPaimonTable(
tableIdent,
@@ -71,13 +78,19 @@ public class PurgeFilesProcedure extends BaseProcedure {
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileIO fileIO = fileStoreTable.fileIO();
Path tablePath =
fileStoreTable.snapshotManager().tablePath();
+ ArrayList<String> deleteDir;
try {
- Arrays.stream(fileIO.listStatus(tablePath))
+ FileStatus[] fileStatuses =
fileIO.listStatus(tablePath);
+ deleteDir = new ArrayList<>(fileStatuses.length);
+ Arrays.stream(fileStatuses)
.filter(f ->
!f.getPath().getName().contains("schema"))
.forEach(
fileStatus -> {
try {
-
fileIO.delete(fileStatus.getPath(), true);
+
deleteDir.add(fileStatus.getPath().getName());
+ if (!dryRun) {
+
fileIO.delete(fileStatus.getPath(), true);
+ }
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -87,13 +100,14 @@ public class PurgeFilesProcedure extends BaseProcedure {
throw new RuntimeException(e);
}
- InternalRow outputRow =
- newInternalRow(
- UTF8String.fromString(
- String.format(
- "Success purge files with
table: %s.",
- fileStoreTable.name())));
- return new InternalRow[] {outputRow};
+ return deleteDir.isEmpty()
+ ? new InternalRow[] {
+ newInternalRow(
+ UTF8String.fromString("There are no
dir to be deleted."))
+ }
+ : deleteDir.stream()
+ .map(x ->
newInternalRow(UTF8String.fromString(x)))
+ .toArray(InternalRow[]::new);
});
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
index b8911ada2f..02fa60f1e0 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
@@ -40,4 +40,35 @@ class PurgeFilesProcedureTest extends PaimonSparkTestBase {
checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
}
+ test("Paimon procedure: purge files test with dry run.") {
+ spark.sql(s"""
+ |CREATE TABLE T (id STRING, name STRING)
+ |USING PAIMON
+ |""".stripMargin)
+
+ // There are no dir to be deleted.
+ checkAnswer(
+ spark.sql("CALL paimon.sys.purge_files(table => 'test.T')"),
+ Row("There are no dir to be deleted.") :: Nil
+ )
+
+ spark.sql("insert into T select '1', 'aa'");
+ checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
+
+ // dry run.
+ checkAnswer(
+ spark.sql("CALL paimon.sys.purge_files(table => 'test.T', dry_run =>
true)"),
+ Row("snapshot") :: Row("bucket-0") :: Row("manifest") :: Nil
+ )
+ checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
+
+ // Do delete.
+ spark.sql("CALL paimon.sys.purge_files(table => 'test.T')")
+ checkAnswer(spark.sql("select * from test.T"), Nil)
+
+ // insert new data.
+ spark.sql("insert into T select '2', 'aa'");
+ checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
+ }
+
}