This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f5e5ada899 [spark] Support compact_database procedure (#6328) (#6910)
f5e5ada899 is described below

commit f5e5ada89979bd8e8723f1c72f2b8fa4f4dc88a3
Author: userzhy <[email protected]>
AuthorDate: Sat Dec 27 09:50:02 2025 +0800

    [spark] Support compact_database procedure (#6328) (#6910)
---
 .../org/apache/paimon/spark/SparkProcedures.java   |   2 +
 .../spark/procedure/CompactDatabaseProcedure.java  | 217 +++++++++++++++++++++
 .../procedure/CompactDatabaseProcedureTest.scala   | 204 +++++++++++++++++++
 3 files changed, 423 insertions(+)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 99ed1e83a3..10d9792317 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
 import org.apache.paimon.spark.procedure.AlterFunctionProcedure;
 import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
 import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
+import org.apache.paimon.spark.procedure.CompactDatabaseProcedure;
 import org.apache.paimon.spark.procedure.CompactManifestProcedure;
 import org.apache.paimon.spark.procedure.CompactProcedure;
 import org.apache.paimon.spark.procedure.CopyFilesProcedure;
@@ -96,6 +97,7 @@ public class SparkProcedures {
         procedureBuilders.put("create_global_index", 
CreateGlobalIndexProcedure::builder);
         procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder);
         procedureBuilders.put("compact", CompactProcedure::builder);
+        procedureBuilders.put("compact_database", 
CompactDatabaseProcedure::builder);
         procedureBuilders.put("rescale", RescaleProcedure::builder);
         procedureBuilders.put("migrate_database", 
MigrateDatabaseProcedure::builder);
         procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java
new file mode 100644
index 0000000000..44889a8cb5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Compact database procedure. Usage:
+ *
+ * <pre><code>
+ *  -- compact all databases
+ *  CALL sys.compact_database()
+ *
+ *  -- compact some databases (accept regular expression)
+ *  CALL sys.compact_database(including_databases => 'db1|db2')
+ *
+ *  -- compact some tables (accept regular expression)
+ *  CALL sys.compact_database(including_databases => 'db1', including_tables 
=> 'table1|table2')
+ *
+ *  -- exclude some tables (accept regular expression)
+ *  CALL sys.compact_database(including_databases => 'db1', including_tables 
=> '.*', excluding_tables => 'ignore_table')
+ *
+ *  -- set table options ('k=v,...')
+ *  CALL sys.compact_database(including_databases => 'db1', options => 
'key1=value1,key2=value2')
+ * </code></pre>
+ */
+public class CompactDatabaseProcedure extends BaseProcedure {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CompactDatabaseProcedure.class);
+
+    private static final ProcedureParameter[] PARAMETERS =
+            new ProcedureParameter[] {
+                ProcedureParameter.optional("including_databases", StringType),
+                ProcedureParameter.optional("including_tables", StringType),
+                ProcedureParameter.optional("excluding_tables", StringType),
+                ProcedureParameter.optional("options", StringType),
+            };
+
+    private static final StructType OUTPUT_TYPE =
+            new StructType(
+                    new StructField[] {
+                        new StructField("result", DataTypes.StringType, true, 
Metadata.empty())
+                    });
+
+    protected CompactDatabaseProcedure(TableCatalog tableCatalog) {
+        super(tableCatalog);
+    }
+
+    @Override
+    public ProcedureParameter[] parameters() {
+        return PARAMETERS;
+    }
+
+    @Override
+    public StructType outputType() {
+        return OUTPUT_TYPE;
+    }
+
+    @Override
+    public InternalRow[] call(InternalRow args) {
+        String includingDatabases = args.isNullAt(0) ? ".*" : 
args.getString(0);
+        String includingTables = args.isNullAt(1) ? ".*" : args.getString(1);
+        String excludingTables = args.isNullAt(2) ? null : args.getString(2);
+        String options = args.isNullAt(3) ? null : args.getString(3);
+
+        Pattern databasePattern = Pattern.compile(includingDatabases);
+        Pattern includingPattern = Pattern.compile(includingTables);
+        Pattern excludingPattern =
+                StringUtils.isNullOrWhitespaceOnly(excludingTables)
+                        ? null
+                        : Pattern.compile(excludingTables);
+
+        Catalog paimonCatalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
+
+        int successCount = 0;
+        int failedCount = 0;
+
+        try {
+            List<String> databases = paimonCatalog.listDatabases();
+            for (String databaseName : databases) {
+                Matcher databaseMatcher = 
databasePattern.matcher(databaseName);
+                if (!databaseMatcher.matches()) {
+                    LOG.debug("Database '{}' is excluded by pattern.", 
databaseName);
+                    continue;
+                }
+
+                List<String> tables = paimonCatalog.listTables(databaseName);
+                for (String tableName : tables) {
+                    String fullTableName = String.format("%s.%s", 
databaseName, tableName);
+
+                    if (!shouldCompactTable(fullTableName, includingPattern, 
excludingPattern)) {
+                        LOG.debug("Table '{}' is excluded by pattern.", 
fullTableName);
+                        continue;
+                    }
+
+                    try {
+                        Table table =
+                                
paimonCatalog.getTable(Identifier.create(databaseName, tableName));
+                        if (!(table instanceof FileStoreTable)) {
+                            LOG.warn(
+                                    "Only FileStoreTable supports compact 
action. "
+                                            + "Table '{}' type is '{}'.",
+                                    fullTableName,
+                                    table.getClass().getName());
+                            continue;
+                        }
+
+                        compactTable(fullTableName, options);
+                        successCount++;
+                        LOG.info("Successfully compacted table: {}", 
fullTableName);
+                    } catch (Exception e) {
+                        failedCount++;
+                        LOG.error("Failed to compact table: {}", 
fullTableName, e);
+                    }
+                }
+            }
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new RuntimeException(e);
+        }
+
+        String result =
+                String.format(
+                        "Compact database finished. Success: %d, Failed: %d",
+                        successCount, failedCount);
+        return new InternalRow[] 
{newInternalRow(UTF8String.fromString(result))};
+    }
+
+    private boolean shouldCompactTable(
+            String fullTableName, Pattern includingPattern, Pattern 
excludingPattern) {
+        boolean shouldCompact = 
includingPattern.matcher(fullTableName).matches();
+        if (excludingPattern != null) {
+            shouldCompact = shouldCompact && 
!excludingPattern.matcher(fullTableName).matches();
+        }
+        return shouldCompact;
+    }
+
+    private void compactTable(String tableName, String options) throws 
Exception {
+        LOG.info("Start to compact table: {}", tableName);
+
+        // Build CompactProcedure and call it for each table
+        CompactProcedure compactProcedure =
+                (CompactProcedure)
+                        
CompactProcedure.builder().withTableCatalog(tableCatalog()).build();
+
+        // Create InternalRow with the parameters for CompactProcedure
+        // Parameters: table, partitions, compact_strategy, order_strategy, 
order_by, where,
+        // options, partition_idle_time
+        InternalRow compactArgs =
+                newInternalRow(
+                        UTF8String.fromString(tableName), // table
+                        null, // partitions
+                        null, // compact_strategy
+                        null, // order_strategy
+                        null, // order_by
+                        null, // where
+                        options == null ? null : 
UTF8String.fromString(options), // options
+                        null // partition_idle_time
+                        );
+
+        InternalRow[] result = compactProcedure.call(compactArgs);
+
+        if (result.length > 0 && !result[0].getBoolean(0)) {
+            throw new RuntimeException("Compact failed for table: " + 
tableName);
+        }
+    }
+
+    public static ProcedureBuilder builder() {
+        return new BaseProcedure.Builder<CompactDatabaseProcedure>() {
+            @Override
+            public CompactDatabaseProcedure doBuild() {
+                return new CompactDatabaseProcedure(tableCatalog());
+            }
+        };
+    }
+
+    @Override
+    public String description() {
+        return "This procedure executes compact action on all tables in 
database(s).";
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala
new file mode 100644
index 0000000000..5bc4802f99
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
+
+import org.assertj.core.api.Assertions
+
+/** Test compact_database procedure. See [[CompactDatabaseProcedure]]. */
+class CompactDatabaseProcedureTest extends PaimonSparkTestBase {
+
+  def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
+    table.snapshotManager().latestSnapshot().commitKind()
+  }
+
+  def lastSnapshotId(table: FileStoreTable): Long = {
+    table.snapshotManager().latestSnapshotId()
+  }
+
+  test("Paimon Procedure: compact database - basic test") {
+    spark.sql("CREATE DATABASE IF NOT EXISTS test_db1")
+    spark.sql("CREATE DATABASE IF NOT EXISTS test_db2")
+
+    withTable("test_db1.T1", "test_db1.T2", "test_db2.T3") {
+      // Create tables in test_db1
+      spark.sql(s"""
+                   |CREATE TABLE test_db1.T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 
'write-only'='true')
+                   |""".stripMargin)
+
+      spark.sql(s"""
+                   |CREATE TABLE test_db1.T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 
'write-only'='true')
+                   |""".stripMargin)
+
+      // Create table in test_db2
+      spark.sql(s"""
+                   |CREATE TABLE test_db2.T3 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 
'write-only'='true')
+                   |""".stripMargin)
+
+      // Insert data multiple times to create multiple files that need 
compaction
+      spark.sql("INSERT INTO test_db1.T1 VALUES (1, 'a'), (2, 'b')")
+      spark.sql("INSERT INTO test_db1.T1 VALUES (3, 'c'), (4, 'd')")
+
+      spark.sql("INSERT INTO test_db1.T2 VALUES (1, 'x'), (2, 'y')")
+      spark.sql("INSERT INTO test_db1.T2 VALUES (3, 'z'), (4, 'w')")
+
+      spark.sql("INSERT INTO test_db2.T3 VALUES (1, 'm'), (2, 'n')")
+      spark.sql("INSERT INTO test_db2.T3 VALUES (3, 'o'), (4, 'p')")
+
+      val table1 = loadTable("test_db1", "T1")
+      val table2 = loadTable("test_db1", "T2")
+      val table3 = loadTable("test_db2", "T3")
+
+      Assertions.assertThat(lastSnapshotId(table1)).isEqualTo(2)
+      Assertions.assertThat(lastSnapshotId(table2)).isEqualTo(2)
+      Assertions.assertThat(lastSnapshotId(table3)).isEqualTo(2)
+
+      // Compact all databases
+      val result = spark.sql("CALL sys.compact_database()").collect()
+      Assertions.assertThat(result.length).isEqualTo(1)
+      Assertions.assertThat(result(0).getString(0)).contains("Success")
+
+      // Verify compaction happened - reload table to get new snapshot
+      val table1After = loadTable("test_db1", "T1")
+      val table2After = loadTable("test_db1", "T2")
+      val table3After = loadTable("test_db2", "T3")
+      
Assertions.assertThat(lastSnapshotCommand(table1After).equals(CommitKind.COMPACT)).isTrue
+      
Assertions.assertThat(lastSnapshotCommand(table2After).equals(CommitKind.COMPACT)).isTrue
+      
Assertions.assertThat(lastSnapshotCommand(table3After).equals(CommitKind.COMPACT)).isTrue
+    }
+
+    spark.sql("DROP DATABASE IF EXISTS test_db1 CASCADE")
+    spark.sql("DROP DATABASE IF EXISTS test_db2 CASCADE")
+  }
+
+  test("Paimon Procedure: compact database - with database filter") {
+    spark.sql("CREATE DATABASE IF NOT EXISTS db_include")
+    spark.sql("CREATE DATABASE IF NOT EXISTS db_exclude")
+
+    withTable("db_include.T1", "db_exclude.T2") {
+      spark.sql(s"""
+                   |CREATE TABLE db_include.T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 
'write-only'='true')
+                   |""".stripMargin)
+
+      spark.sql(s"""
+                   |CREATE TABLE db_exclude.T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 
'write-only'='true')
+                   |""".stripMargin)
+
+      spark.sql("INSERT INTO db_include.T1 VALUES (1, 'a'), (2, 'b')")
+      spark.sql("INSERT INTO db_include.T1 VALUES (3, 'c'), (4, 'd')")
+
+      spark.sql("INSERT INTO db_exclude.T2 VALUES (1, 'x'), (2, 'y')")
+      spark.sql("INSERT INTO db_exclude.T2 VALUES (3, 'z'), (4, 'w')")
+
+      val table1 = loadTable("db_include", "T1")
+      val table2 = loadTable("db_exclude", "T2")
+
+      // Compact only db_include database
+      spark.sql("CALL sys.compact_database(including_databases => 
'db_include')")
+
+      // Only table1 should be compacted - reload table to get new snapshot
+      val table1After = loadTable("db_include", "T1")
+      
Assertions.assertThat(lastSnapshotCommand(table1After).equals(CommitKind.COMPACT)).isTrue
+      Assertions.assertThat(lastSnapshotId(table2)).isEqualTo(2) // Still only 
2 snapshots
+    }
+
+    spark.sql("DROP DATABASE IF EXISTS db_include CASCADE")
+    spark.sql("DROP DATABASE IF EXISTS db_exclude CASCADE")
+  }
+
+  test("Paimon Procedure: compact database - with table filter") {
+    spark.sql("CREATE DATABASE IF NOT EXISTS filter_db")
+
+    withTable("filter_db.include_table", "filter_db.exclude_table") {
+      spark.sql(s"""
+                   |CREATE TABLE filter_db.include_table (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 
'write-only'='true')
+                   |""".stripMargin)
+
+      spark.sql(s"""
+                   |CREATE TABLE filter_db.exclude_table (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 
'write-only'='true')
+                   |""".stripMargin)
+
+      spark.sql("INSERT INTO filter_db.include_table VALUES (1, 'a'), (2, 
'b')")
+      spark.sql("INSERT INTO filter_db.include_table VALUES (3, 'c'), (4, 
'd')")
+
+      spark.sql("INSERT INTO filter_db.exclude_table VALUES (1, 'x'), (2, 
'y')")
+      spark.sql("INSERT INTO filter_db.exclude_table VALUES (3, 'z'), (4, 
'w')")
+
+      val includeTable = loadTable("filter_db", "include_table")
+      val excludeTable = loadTable("filter_db", "exclude_table")
+
+      // Compact only include_table using including_tables pattern
+      spark.sql(
+        "CALL sys.compact_database(including_databases => 'filter_db', 
including_tables => '.*include.*')")
+
+      val includeTableAfter = loadTable("filter_db", "include_table")
+      Assertions
+        
.assertThat(lastSnapshotCommand(includeTableAfter).equals(CommitKind.COMPACT))
+        .isTrue
+      Assertions.assertThat(lastSnapshotId(excludeTable)).isEqualTo(2)
+    }
+
+    spark.sql("DROP DATABASE IF EXISTS filter_db CASCADE")
+  }
+
+  test("Paimon Procedure: compact database - with excluding_tables filter") {
+    spark.sql("CREATE DATABASE IF NOT EXISTS exclude_db")
+
+    withTable("exclude_db.T1", "exclude_db.T2") {
+      spark.sql(s"""
+                   |CREATE TABLE exclude_db.T1 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 
'write-only'='true')
+                   |""".stripMargin)
+
+      spark.sql(s"""
+                   |CREATE TABLE exclude_db.T2 (id INT, value STRING)
+                   |TBLPROPERTIES ('primary-key'='id', 'bucket'='1', 
'write-only'='true')
+                   |""".stripMargin)
+
+      spark.sql("INSERT INTO exclude_db.T1 VALUES (1, 'a'), (2, 'b')")
+      spark.sql("INSERT INTO exclude_db.T1 VALUES (3, 'c'), (4, 'd')")
+
+      spark.sql("INSERT INTO exclude_db.T2 VALUES (1, 'x'), (2, 'y')")
+      spark.sql("INSERT INTO exclude_db.T2 VALUES (3, 'z'), (4, 'w')")
+
+      val table1 = loadTable("exclude_db", "T1")
+      val table2 = loadTable("exclude_db", "T2")
+
+      // Compact all tables except T2
+      spark.sql(
+        "CALL sys.compact_database(including_databases => 'exclude_db', 
excluding_tables => '.*T2')")
+
+      val table1After = loadTable("exclude_db", "T1")
+      
Assertions.assertThat(lastSnapshotCommand(table1After).equals(CommitKind.COMPACT)).isTrue
+      Assertions.assertThat(lastSnapshotId(table2)).isEqualTo(2)
+    }
+
+    spark.sql("DROP DATABASE IF EXISTS exclude_db CASCADE")
+  }
+}

Reply via email to