This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new be461dcf4a [core] Add dry_run parameters to purge_files Procedure and 
display the list of deleted directories (#5342)
be461dcf4a is described below

commit be461dcf4a4976d9a90c4e82ba211dbc62c1ba96
Author: HunterXHunter <[email protected]>
AuthorDate: Thu Mar 27 17:48:40 2025 +0800

    [core] Add dry_run parameters to purge_files Procedure and display the list 
of deleted directories (#5342)
---
 docs/content/flink/procedures.md                   | 15 ++++++---
 docs/content/spark/procedures.md                   |  4 ++-
 .../flink/procedure/PurgeFilesProcedure.java       | 23 +++++++++++---
 .../flink/procedure/PurgeFilesProcedure.java       | 29 ++++++++++++-----
 .../flink/procedure/PurgeFilesProcedureITCase.java | 35 +++++++++++++++++++++
 .../spark/procedure/PurgeFilesProcedure.java       | 36 +++++++++++++++-------
 .../spark/procedure/PurgeFilesProcedureTest.scala  | 31 +++++++++++++++++++
 7 files changed, 145 insertions(+), 28 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 49507721a9..e3013c1384 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -488,20 +488,27 @@ All available procedures are listed below.
       <td>
          -- for Flink 1.18<br/>
          -- clear table with purge files directly.<br/>
-         CALL [catalog.]sys.purge_files('identifier')<br/><br/>
+         CALL [catalog.]sys.purge_files('identifier')<br/>
+         -- only check what dirs will be deleted, but not really delete 
them.<br/>
+         CALL [catalog.]sys.purge_files('identifier', true)<br/><br/>
          -- for Flink 1.19 and later<br/>
          -- clear table with purge files directly.<br/>
-         CALL [catalog.]sys.purge_files(`table` => 'default.T')<br/><br/>
+         CALL [catalog.]sys.purge_files(`table` => 'default.T')<br/>
+         -- only check what dirs will be deleted, but not really delete 
them.<br/>
+         CALL [catalog.]sys.purge_files(`table` => 'default.T', `dry_run` => 
true)<br/><br/>
       </td>
       <td>
          To clear table with purge files directly. Argument:
             <li>table: the target table identifier. Cannot be empty.</li>
+            <li>dry_run (optional): only check what dirs will be deleted, but 
not really delete them. Default is false.</li>
       </td>
       <td>
          -- for Flink 1.18<br/>
-         CALL sys.purge_files('default.T')<br/><br/>
+         CALL sys.purge_files('default.T')<br/>
+         CALL sys.purge_files('default.T', true)<br/><br/>
          -- for Flink 1.19 and later<br/>
-         CALL sys.purge_files(`table` => 'default.T')
+         CALL sys.purge_files(`table` => 'default.T')<br/>
+         CALL sys.purge_files(`table` => 'default.T', `dry_run` => true)
       </td>
    </tr>
    <tr>
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index fb46b6266c..21d40e610d 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -200,9 +200,11 @@ This section introduce all available spark procedures 
about paimon.
       <td>
          To clear table with purge files directly. Argument:
             <li>table: the target table identifier. Cannot be empty.</li>
+            <li>dry_run (optional): only check what dirs will be deleted, but 
not really delete them. Default is false.</li>
       </td>
       <td>
-          CALL sys.purge_files(table => 'default.T')<br/><br/>
+          CALL sys.purge_files(table => 'default.T')<br/>
+          CALL sys.purge_files(table => 'default.T', dry_run => true)
       </td>
     </tr>
     <tr>
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
index 3053eae3c7..2901fd1f59 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.procedure;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -28,6 +29,7 @@ import org.apache.paimon.table.Table;
 import org.apache.flink.table.procedure.ProcedureContext;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 
 /** A procedure to purge files for a table. */
@@ -36,17 +38,28 @@ public class PurgeFilesProcedure extends ProcedureBase {
 
     public String[] call(ProcedureContext procedureContext, String tableId)
             throws Catalog.TableNotExistException {
+        return call(procedureContext, tableId, false);
+    }
+
+    public String[] call(ProcedureContext procedureContext, String tableId, 
boolean dryRun)
+            throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         FileIO fileIO = fileStoreTable.fileIO();
         Path tablePath = fileStoreTable.snapshotManager().tablePath();
+        ArrayList<String> deleteDir;
         try {
-            Arrays.stream(fileIO.listStatus(tablePath))
+            FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
+            deleteDir = new ArrayList<>(fileStatuses.length);
+            Arrays.stream(fileStatuses)
                     .filter(f -> !f.getPath().getName().contains("schema"))
                     .forEach(
                             fileStatus -> {
                                 try {
-                                    fileIO.delete(fileStatus.getPath(), true);
+                                    
deleteDir.add(fileStatus.getPath().getName());
+                                    if (!dryRun) {
+                                        fileIO.delete(fileStatus.getPath(), 
true);
+                                    }
                                 } catch (IOException e) {
                                     throw new RuntimeException(e);
                                 }
@@ -54,9 +67,9 @@ public class PurgeFilesProcedure extends ProcedureBase {
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        return new String[] {
-            String.format("Success purge files with table: %s.", 
fileStoreTable.name())
-        };
+        return deleteDir.isEmpty()
+                ? new String[] {"There are no dir to be deleted."}
+                : deleteDir.toArray(new String[0]);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
index 7ee2a36104..64529db3c8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/PurgeFilesProcedure.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.procedure;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -29,8 +30,10 @@ import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
 import org.apache.flink.table.annotation.ProcedureHint;
 import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.types.Row;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 
 /**
@@ -45,20 +48,31 @@ public class PurgeFilesProcedure extends ProcedureBase {
 
     public static final String IDENTIFIER = "purge_files";
 
-    @ProcedureHint(argument = {@ArgumentHint(name = "table", type = 
@DataTypeHint("STRING"))})
-    public String[] call(ProcedureContext procedureContext, String tableId)
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "dry_run", type = 
@DataTypeHint("BOOLEAN"), isOptional = true)
+            })
+    public @DataTypeHint("ROW<purged_file_path STRING>") Row[] call(
+            ProcedureContext procedureContext, String tableId, Boolean dryRun)
             throws Catalog.TableNotExistException {
         Table table = catalog.getTable(Identifier.fromString(tableId));
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         FileIO fileIO = fileStoreTable.fileIO();
         Path tablePath = fileStoreTable.snapshotManager().tablePath();
+        ArrayList<String> deleteDir;
         try {
-            Arrays.stream(fileIO.listStatus(tablePath))
+            FileStatus[] fileStatuses = fileIO.listStatus(tablePath);
+            deleteDir = new ArrayList<>(fileStatuses.length);
+            Arrays.stream(fileStatuses)
                     .filter(f -> !f.getPath().getName().contains("schema"))
                     .forEach(
                             fileStatus -> {
                                 try {
-                                    fileIO.delete(fileStatus.getPath(), true);
+                                    
deleteDir.add(fileStatus.getPath().getName());
+                                    if (dryRun == null || !dryRun) {
+                                        fileIO.delete(fileStatus.getPath(), 
true);
+                                    }
                                 } catch (IOException e) {
                                     throw new RuntimeException(e);
                                 }
@@ -66,9 +80,10 @@ public class PurgeFilesProcedure extends ProcedureBase {
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
-        return new String[] {
-            String.format("Success purge files with table: %s.", 
fileStoreTable.name())
-        };
+
+        return deleteDir.isEmpty()
+                ? new Row[] {Row.of("There are no dir to be deleted.")}
+                : deleteDir.stream().map(Row::of).toArray(Row[]::new);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
index 9eb9aad296..6cd4351783 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/PurgeFilesProcedureITCase.java
@@ -45,4 +45,39 @@ public class PurgeFilesProcedureITCase extends 
CatalogITCaseBase {
         sql("INSERT INTO T VALUES (2, 'a')");
         assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
     }
+
+    @Test
+    public void testPurgeFilesDryRun() {
+        sql(
+                "CREATE TABLE T (id INT, name STRING,"
+                        + " PRIMARY KEY (id) NOT ENFORCED)"
+                        + " WITH ('bucket'='1')");
+        // There are no dir to delete.
+        assertThat(
+                        sql("CALL sys.purge_files(`table` => 'default.T', 
`dry_run` => true)")
+                                .stream()
+                                .map(row -> row.getField(0)))
+                .containsExactlyInAnyOrder("There are no dir to be deleted.");
+
+        sql("INSERT INTO T VALUES (1, 'a')");
+        assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
+
+        // dry run.
+        assertThat(
+                        sql("CALL sys.purge_files(`table` => 'default.T', 
`dry_run` => true)")
+                                .stream()
+                                .map(row -> row.getField(0)))
+                .containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
+
+        assertThat(sql("select * from `T`")).containsExactly(Row.of(1, "a"));
+
+        assertThat(
+                        sql("CALL sys.purge_files(`table` => 
'default.T')").stream()
+                                .map(row -> row.getField(0)))
+                .containsExactlyInAnyOrder("snapshot", "bucket-0", "manifest");
+        assertThat(sql("select * from `T`")).containsExactly();
+
+        sql("INSERT INTO T VALUES (2, 'a')");
+        assertThat(sql("select * from `T`")).containsExactly(Row.of(2, "a"));
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
index 9db724294a..3def44b5ae 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/PurgeFilesProcedure.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.procedure;
 
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.table.FileStoreTable;
 
@@ -31,20 +32,25 @@ import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 
+import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /** A procedure to purge files for a table. */
 public class PurgeFilesProcedure extends BaseProcedure {
 
     private static final ProcedureParameter[] PARAMETERS =
-            new ProcedureParameter[] {ProcedureParameter.required("table", 
StringType)};
+            new ProcedureParameter[] {
+                ProcedureParameter.required("table", StringType),
+                ProcedureParameter.optional("dry_run", BooleanType)
+            };
 
     private static final StructType OUTPUT_TYPE =
             new StructType(
                     new StructField[] {
-                        new StructField("result", StringType, true, 
Metadata.empty())
+                        new StructField("purged_file_path", StringType, true, 
Metadata.empty())
                     });
 
     private PurgeFilesProcedure(TableCatalog tableCatalog) {
@@ -64,6 +70,7 @@ public class PurgeFilesProcedure extends BaseProcedure {
     @Override
     public InternalRow[] call(InternalRow args) {
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
+        boolean dryRun = !args.isNullAt(1) && args.getBoolean(1);
 
         return modifyPaimonTable(
                 tableIdent,
@@ -71,13 +78,19 @@ public class PurgeFilesProcedure extends BaseProcedure {
                     FileStoreTable fileStoreTable = (FileStoreTable) table;
                     FileIO fileIO = fileStoreTable.fileIO();
                     Path tablePath = 
fileStoreTable.snapshotManager().tablePath();
+                    ArrayList<String> deleteDir;
                     try {
-                        Arrays.stream(fileIO.listStatus(tablePath))
+                        FileStatus[] fileStatuses = 
fileIO.listStatus(tablePath);
+                        deleteDir = new ArrayList<>(fileStatuses.length);
+                        Arrays.stream(fileStatuses)
                                 .filter(f -> 
!f.getPath().getName().contains("schema"))
                                 .forEach(
                                         fileStatus -> {
                                             try {
-                                                
fileIO.delete(fileStatus.getPath(), true);
+                                                
deleteDir.add(fileStatus.getPath().getName());
+                                                if (!dryRun) {
+                                                    
fileIO.delete(fileStatus.getPath(), true);
+                                                }
                                             } catch (IOException e) {
                                                 throw new RuntimeException(e);
                                             }
@@ -87,13 +100,14 @@ public class PurgeFilesProcedure extends BaseProcedure {
                         throw new RuntimeException(e);
                     }
 
-                    InternalRow outputRow =
-                            newInternalRow(
-                                    UTF8String.fromString(
-                                            String.format(
-                                                    "Success purge files with 
table: %s.",
-                                                    fileStoreTable.name())));
-                    return new InternalRow[] {outputRow};
+                    return deleteDir.isEmpty()
+                            ? new InternalRow[] {
+                                newInternalRow(
+                                        UTF8String.fromString("There are no 
dir to be deleted."))
+                            }
+                            : deleteDir.stream()
+                                    .map(x -> 
newInternalRow(UTF8String.fromString(x)))
+                                    .toArray(InternalRow[]::new);
                 });
     }
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
index b8911ada2f..02fa60f1e0 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/PurgeFilesProcedureTest.scala
@@ -40,4 +40,35 @@ class PurgeFilesProcedureTest extends PaimonSparkTestBase {
     checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
   }
 
+  test("Paimon procedure: purge files test with dry run.") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id STRING, name STRING)
+                 |USING PAIMON
+                 |""".stripMargin)
+
+    // There are no dir to be deleted.
+    checkAnswer(
+      spark.sql("CALL paimon.sys.purge_files(table => 'test.T')"),
+      Row("There are no dir to be deleted.") :: Nil
+    )
+
+    spark.sql("insert into T select '1', 'aa'");
+    checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
+
+    // dry run.
+    checkAnswer(
+      spark.sql("CALL paimon.sys.purge_files(table => 'test.T', dry_run => 
true)"),
+      Row("snapshot") :: Row("bucket-0") :: Row("manifest") :: Nil
+    )
+    checkAnswer(spark.sql("select * from test.T"), Row("1", "aa") :: Nil)
+
+    // Do delete.
+    spark.sql("CALL paimon.sys.purge_files(table => 'test.T')")
+    checkAnswer(spark.sql("select * from test.T"), Nil)
+
+    // insert new data.
+    spark.sql("insert into T select '2', 'aa'");
+    checkAnswer(spark.sql("select * from test.T"), Row("2", "aa") :: Nil)
+  }
+
 }

Reply via email to