This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f5e5ada899 [spark] Support compact_database procedure (#6328) (#6910)
f5e5ada899 is described below
commit f5e5ada89979bd8e8723f1c72f2b8fa4f4dc88a3
Author: userzhy <[email protected]>
AuthorDate: Sat Dec 27 09:50:02 2025 +0800
[spark] Support compact_database procedure (#6328) (#6910)
---
.../org/apache/paimon/spark/SparkProcedures.java | 2 +
.../spark/procedure/CompactDatabaseProcedure.java | 217 +++++++++++++++++++++
.../procedure/CompactDatabaseProcedureTest.scala | 204 +++++++++++++++++++
3 files changed, 423 insertions(+)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 99ed1e83a3..10d9792317 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -21,6 +21,7 @@ package org.apache.paimon.spark;
import org.apache.paimon.spark.procedure.AlterFunctionProcedure;
import org.apache.paimon.spark.procedure.AlterViewDialectProcedure;
import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
+import org.apache.paimon.spark.procedure.CompactDatabaseProcedure;
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
import org.apache.paimon.spark.procedure.CopyFilesProcedure;
@@ -96,6 +97,7 @@ public class SparkProcedures {
procedureBuilders.put("create_global_index",
CreateGlobalIndexProcedure::builder);
procedureBuilders.put("delete_branch", DeleteBranchProcedure::builder);
procedureBuilders.put("compact", CompactProcedure::builder);
+ procedureBuilders.put("compact_database",
CompactDatabaseProcedure::builder);
procedureBuilders.put("rescale", RescaleProcedure::builder);
procedureBuilders.put("migrate_database",
MigrateDatabaseProcedure::builder);
procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java
new file mode 100644
index 0000000000..44889a8cb5
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactDatabaseProcedure.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Compact database procedure. Usage:
+ *
+ * <pre><code>
+ * -- compact all databases
+ * CALL sys.compact_database()
+ *
+ * -- compact some databases (accept regular expression)
+ * CALL sys.compact_database(including_databases => 'db1|db2')
+ *
+ * -- compact some tables (accept regular expression)
+ * CALL sys.compact_database(including_databases => 'db1', including_tables
=> 'table1|table2')
+ *
+ * -- exclude some tables (accept regular expression)
+ * CALL sys.compact_database(including_databases => 'db1', including_tables
=> '.*', excluding_tables => 'ignore_table')
+ *
+ * -- set table options ('k=v,...')
+ * CALL sys.compact_database(including_databases => 'db1', options =>
'key1=value1,key2=value2')
+ * </code></pre>
+ */
+public class CompactDatabaseProcedure extends BaseProcedure {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactDatabaseProcedure.class);
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.optional("including_databases", StringType),
+ ProcedureParameter.optional("including_tables", StringType),
+ ProcedureParameter.optional("excluding_tables", StringType),
+ ProcedureParameter.optional("options", StringType),
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.StringType, true,
Metadata.empty())
+ });
+
+ protected CompactDatabaseProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ String includingDatabases = args.isNullAt(0) ? ".*" :
args.getString(0);
+ String includingTables = args.isNullAt(1) ? ".*" : args.getString(1);
+ String excludingTables = args.isNullAt(2) ? null : args.getString(2);
+ String options = args.isNullAt(3) ? null : args.getString(3);
+
+ Pattern databasePattern = Pattern.compile(includingDatabases);
+ Pattern includingPattern = Pattern.compile(includingTables);
+ Pattern excludingPattern =
+ StringUtils.isNullOrWhitespaceOnly(excludingTables)
+ ? null
+ : Pattern.compile(excludingTables);
+
+ Catalog paimonCatalog = ((WithPaimonCatalog)
tableCatalog()).paimonCatalog();
+
+ int successCount = 0;
+ int failedCount = 0;
+
+ try {
+ List<String> databases = paimonCatalog.listDatabases();
+ for (String databaseName : databases) {
+ Matcher databaseMatcher =
databasePattern.matcher(databaseName);
+ if (!databaseMatcher.matches()) {
+ LOG.debug("Database '{}' is excluded by pattern.",
databaseName);
+ continue;
+ }
+
+ List<String> tables = paimonCatalog.listTables(databaseName);
+ for (String tableName : tables) {
+ String fullTableName = String.format("%s.%s",
databaseName, tableName);
+
+ if (!shouldCompactTable(fullTableName, includingPattern,
excludingPattern)) {
+ LOG.debug("Table '{}' is excluded by pattern.",
fullTableName);
+ continue;
+ }
+
+ try {
+ Table table =
+
paimonCatalog.getTable(Identifier.create(databaseName, tableName));
+ if (!(table instanceof FileStoreTable)) {
+ LOG.warn(
+ "Only FileStoreTable supports compact
action. "
+ + "Table '{}' type is '{}'.",
+ fullTableName,
+ table.getClass().getName());
+ continue;
+ }
+
+ compactTable(fullTableName, options);
+ successCount++;
+ LOG.info("Successfully compacted table: {}",
fullTableName);
+ } catch (Exception e) {
+ failedCount++;
+ LOG.error("Failed to compact table: {}",
fullTableName, e);
+ }
+ }
+ }
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new RuntimeException(e);
+ }
+
+ String result =
+ String.format(
+ "Compact database finished. Success: %d, Failed: %d",
+ successCount, failedCount);
+ return new InternalRow[]
{newInternalRow(UTF8String.fromString(result))};
+ }
+
+ private boolean shouldCompactTable(
+ String fullTableName, Pattern includingPattern, Pattern
excludingPattern) {
+ boolean shouldCompact =
includingPattern.matcher(fullTableName).matches();
+ if (excludingPattern != null) {
+ shouldCompact = shouldCompact &&
!excludingPattern.matcher(fullTableName).matches();
+ }
+ return shouldCompact;
+ }
+
+ private void compactTable(String tableName, String options) throws
Exception {
+ LOG.info("Start to compact table: {}", tableName);
+
+ // Build CompactProcedure and call it for each table
+ CompactProcedure compactProcedure =
+ (CompactProcedure)
+
CompactProcedure.builder().withTableCatalog(tableCatalog()).build();
+
+ // Create InternalRow with the parameters for CompactProcedure
+ // Parameters: table, partitions, compact_strategy, order_strategy,
order_by, where,
+ // options, partition_idle_time
+ InternalRow compactArgs =
+ newInternalRow(
+ UTF8String.fromString(tableName), // table
+ null, // partitions
+ null, // compact_strategy
+ null, // order_strategy
+ null, // order_by
+ null, // where
+ options == null ? null :
UTF8String.fromString(options), // options
+ null // partition_idle_time
+ );
+
+ InternalRow[] result = compactProcedure.call(compactArgs);
+
+ if (result.length > 0 && !result[0].getBoolean(0)) {
+ throw new RuntimeException("Compact failed for table: " +
tableName);
+ }
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<CompactDatabaseProcedure>() {
+ @Override
+ public CompactDatabaseProcedure doBuild() {
+ return new CompactDatabaseProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "This procedure executes compact action on all tables in
database(s).";
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala
new file mode 100644
index 0000000000..5bc4802f99
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactDatabaseProcedureTest.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.Snapshot.CommitKind
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
+
+import org.assertj.core.api.Assertions
+
+/** Test compact_database procedure. See [[CompactDatabaseProcedure]]. */
+class CompactDatabaseProcedureTest extends PaimonSparkTestBase {
+
+ def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
+ table.snapshotManager().latestSnapshot().commitKind()
+ }
+
+ def lastSnapshotId(table: FileStoreTable): Long = {
+ table.snapshotManager().latestSnapshotId()
+ }
+
+ test("Paimon Procedure: compact database - basic test") {
+ spark.sql("CREATE DATABASE IF NOT EXISTS test_db1")
+ spark.sql("CREATE DATABASE IF NOT EXISTS test_db2")
+
+ withTable("test_db1.T1", "test_db1.T2", "test_db2.T3") {
+ // Create tables in test_db1
+ spark.sql(s"""
+ |CREATE TABLE test_db1.T1 (id INT, value STRING)
+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='1',
'write-only'='true')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE test_db1.T2 (id INT, value STRING)
+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='1',
'write-only'='true')
+ |""".stripMargin)
+
+ // Create table in test_db2
+ spark.sql(s"""
+ |CREATE TABLE test_db2.T3 (id INT, value STRING)
+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='1',
'write-only'='true')
+ |""".stripMargin)
+
+ // Insert data multiple times to create multiple files that need
compaction
+ spark.sql("INSERT INTO test_db1.T1 VALUES (1, 'a'), (2, 'b')")
+ spark.sql("INSERT INTO test_db1.T1 VALUES (3, 'c'), (4, 'd')")
+
+ spark.sql("INSERT INTO test_db1.T2 VALUES (1, 'x'), (2, 'y')")
+ spark.sql("INSERT INTO test_db1.T2 VALUES (3, 'z'), (4, 'w')")
+
+ spark.sql("INSERT INTO test_db2.T3 VALUES (1, 'm'), (2, 'n')")
+ spark.sql("INSERT INTO test_db2.T3 VALUES (3, 'o'), (4, 'p')")
+
+ val table1 = loadTable("test_db1", "T1")
+ val table2 = loadTable("test_db1", "T2")
+ val table3 = loadTable("test_db2", "T3")
+
+ Assertions.assertThat(lastSnapshotId(table1)).isEqualTo(2)
+ Assertions.assertThat(lastSnapshotId(table2)).isEqualTo(2)
+ Assertions.assertThat(lastSnapshotId(table3)).isEqualTo(2)
+
+ // Compact all databases
+ val result = spark.sql("CALL sys.compact_database()").collect()
+ Assertions.assertThat(result.length).isEqualTo(1)
+ Assertions.assertThat(result(0).getString(0)).contains("Success")
+
+ // Verify compaction happened - reload table to get new snapshot
+ val table1After = loadTable("test_db1", "T1")
+ val table2After = loadTable("test_db1", "T2")
+ val table3After = loadTable("test_db2", "T3")
+
Assertions.assertThat(lastSnapshotCommand(table1After).equals(CommitKind.COMPACT)).isTrue
+
Assertions.assertThat(lastSnapshotCommand(table2After).equals(CommitKind.COMPACT)).isTrue
+
Assertions.assertThat(lastSnapshotCommand(table3After).equals(CommitKind.COMPACT)).isTrue
+ }
+
+ spark.sql("DROP DATABASE IF EXISTS test_db1 CASCADE")
+ spark.sql("DROP DATABASE IF EXISTS test_db2 CASCADE")
+ }
+
+ test("Paimon Procedure: compact database - with database filter") {
+ spark.sql("CREATE DATABASE IF NOT EXISTS db_include")
+ spark.sql("CREATE DATABASE IF NOT EXISTS db_exclude")
+
+ withTable("db_include.T1", "db_exclude.T2") {
+ spark.sql(s"""
+ |CREATE TABLE db_include.T1 (id INT, value STRING)
+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='1',
'write-only'='true')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE db_exclude.T2 (id INT, value STRING)
+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='1',
'write-only'='true')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO db_include.T1 VALUES (1, 'a'), (2, 'b')")
+ spark.sql("INSERT INTO db_include.T1 VALUES (3, 'c'), (4, 'd')")
+
+ spark.sql("INSERT INTO db_exclude.T2 VALUES (1, 'x'), (2, 'y')")
+ spark.sql("INSERT INTO db_exclude.T2 VALUES (3, 'z'), (4, 'w')")
+
+ val table1 = loadTable("db_include", "T1")
+ val table2 = loadTable("db_exclude", "T2")
+
+ // Compact only db_include database
+ spark.sql("CALL sys.compact_database(including_databases =>
'db_include')")
+
+ // Only table1 should be compacted - reload table to get new snapshot
+ val table1After = loadTable("db_include", "T1")
+
Assertions.assertThat(lastSnapshotCommand(table1After).equals(CommitKind.COMPACT)).isTrue
+ Assertions.assertThat(lastSnapshotId(table2)).isEqualTo(2) // Still only
2 snapshots
+ }
+
+ spark.sql("DROP DATABASE IF EXISTS db_include CASCADE")
+ spark.sql("DROP DATABASE IF EXISTS db_exclude CASCADE")
+ }
+
+ test("Paimon Procedure: compact database - with table filter") {
+ spark.sql("CREATE DATABASE IF NOT EXISTS filter_db")
+
+ withTable("filter_db.include_table", "filter_db.exclude_table") {
+ spark.sql(s"""
+ |CREATE TABLE filter_db.include_table (id INT, value STRING)
+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='1',
'write-only'='true')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE filter_db.exclude_table (id INT, value STRING)
+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='1',
'write-only'='true')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO filter_db.include_table VALUES (1, 'a'), (2,
'b')")
+ spark.sql("INSERT INTO filter_db.include_table VALUES (3, 'c'), (4,
'd')")
+
+ spark.sql("INSERT INTO filter_db.exclude_table VALUES (1, 'x'), (2,
'y')")
+ spark.sql("INSERT INTO filter_db.exclude_table VALUES (3, 'z'), (4,
'w')")
+
+ val includeTable = loadTable("filter_db", "include_table")
+ val excludeTable = loadTable("filter_db", "exclude_table")
+
+ // Compact only include_table using including_tables pattern
+ spark.sql(
+ "CALL sys.compact_database(including_databases => 'filter_db',
including_tables => '.*include.*')")
+
+ val includeTableAfter = loadTable("filter_db", "include_table")
+ Assertions
+
.assertThat(lastSnapshotCommand(includeTableAfter).equals(CommitKind.COMPACT))
+ .isTrue
+ Assertions.assertThat(lastSnapshotId(excludeTable)).isEqualTo(2)
+ }
+
+ spark.sql("DROP DATABASE IF EXISTS filter_db CASCADE")
+ }
+
+ test("Paimon Procedure: compact database - with excluding_tables filter") {
+ spark.sql("CREATE DATABASE IF NOT EXISTS exclude_db")
+
+ withTable("exclude_db.T1", "exclude_db.T2") {
+ spark.sql(s"""
+ |CREATE TABLE exclude_db.T1 (id INT, value STRING)
+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='1',
'write-only'='true')
+ |""".stripMargin)
+
+ spark.sql(s"""
+ |CREATE TABLE exclude_db.T2 (id INT, value STRING)
+ |TBLPROPERTIES ('primary-key'='id', 'bucket'='1',
'write-only'='true')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO exclude_db.T1 VALUES (1, 'a'), (2, 'b')")
+ spark.sql("INSERT INTO exclude_db.T1 VALUES (3, 'c'), (4, 'd')")
+
+ spark.sql("INSERT INTO exclude_db.T2 VALUES (1, 'x'), (2, 'y')")
+ spark.sql("INSERT INTO exclude_db.T2 VALUES (3, 'z'), (4, 'w')")
+
+ val table1 = loadTable("exclude_db", "T1")
+ val table2 = loadTable("exclude_db", "T2")
+
+ // Compact all tables except T2
+ spark.sql(
+ "CALL sys.compact_database(including_databases => 'exclude_db',
excluding_tables => '.*T2')")
+
+ val table1After = loadTable("exclude_db", "T1")
+
Assertions.assertThat(lastSnapshotCommand(table1After).equals(CommitKind.COMPACT)).isTrue
+ Assertions.assertThat(lastSnapshotId(table2)).isEqualTo(2)
+ }
+
+ spark.sql("DROP DATABASE IF EXISTS exclude_db CASCADE")
+ }
+}