This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new a5c41c92f [spark] CompactProcedure add options parameter (#3672)
a5c41c92f is described below

commit a5c41c92f3fd01571764d255c1e630e9cac70b4f
Author: askwang <[email protected]>
AuthorDate: Fri Jul 5 13:41:20 2024 +0800

    [spark] CompactProcedure add options parameter (#3672)
---
 .../paimon/spark/procedure/CompactProcedure.java   | 14 +++++++--
 .../spark/procedure/CompactProcedureTestBase.scala | 33 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index e21702bab..eaf7e3a00 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -74,6 +74,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -110,7 +111,8 @@ public class CompactProcedure extends BaseProcedure {
                 ProcedureParameter.optional("order_strategy", StringType),
                 ProcedureParameter.optional("order_by", StringType),
                 ProcedureParameter.optional("where", StringType),
-                ProcedureParameter.optional("max_concurrent_jobs", IntegerType)
+                ProcedureParameter.optional("max_concurrent_jobs", 
IntegerType),
+                ProcedureParameter.optional("options", StringType),
             };
 
     private static final StructType OUTPUT_TYPE =
@@ -144,6 +146,7 @@ public class CompactProcedure extends BaseProcedure {
                         : Arrays.asList(args.getString(3).split(","));
         String where = blank(args, 4) ? null : args.getString(4);
         int maxConcurrentJobs = args.isNullAt(5) ? 15 : args.getInt(5);
+        String options = args.isNullAt(6) ? null : args.getString(6);
         if (TableSorter.OrderType.NONE.name().equals(sortType) && 
!sortColumns.isEmpty()) {
             throw new IllegalArgumentException(
                     "order_strategy \"none\" cannot work with order_by 
columns.");
@@ -174,6 +177,14 @@ public class CompactProcedure extends BaseProcedure {
                                 condition,
                                 table.partitionKeys());
                     }
+
+                    Map<String, String> dynamicOptions = new HashMap<>();
+                    dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
+                    if (!StringUtils.isBlank(options)) {
+                        
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
+                    }
+                    table = table.copy(dynamicOptions);
+
                     InternalRow internalRow =
                             newInternalRow(
                                     execute(
@@ -203,7 +214,6 @@ public class CompactProcedure extends BaseProcedure {
             DataSourceV2Relation relation,
             @Nullable Expression condition,
             int maxConcurrentJobs) {
-        table = 
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
         BucketMode bucketMode = table.bucketMode();
         TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
         Predicate filter =
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 245076f7b..73d6fc442 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -528,6 +528,39 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     Assertions.assertThat(where).isEqualTo(whereExpected)
   }
 
+  test("Paimon Procedure: compact unaware bucket append table with option") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, value STRING, pt STRING)
+                 |TBLPROPERTIES ('bucket'='-1', 'write-only'='true')
+                 |PARTITIONED BY (pt)
+                 |""".stripMargin)
+
+    val table = loadTable("T")
+
+    spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+    spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
+    spark.sql(s"INSERT INTO T VALUES (5, 'e', 'p1'), (6, 'f', 'p2')")
+
+    spark.sql(
+      "CALL sys.compact(table => 'T', partitions => 'pt=\"p1\"', options => 
'compaction.min.file-num=2,compaction.max.file-num = 3')")
+    
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+    Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
+
+    spark.sql(
+      "CALL sys.compact(table => 'T', options => 
'compaction.min.file-num=2,compaction.max.file-num = 3')")
+    
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+    Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5)
+
+    // compact condition no longer met
+    spark.sql(s"CALL sys.compact(table => 'T')")
+    Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5)
+
+    checkAnswer(
+      spark.sql(s"SELECT * FROM T ORDER BY id"),
+      Row(1, "a", "p1") :: Row(2, "b", "p2") :: Row(3, "c", "p1") :: Row(4, 
"d", "p2") ::
+        Row(5, "e", "p1") :: Row(6, "f", "p2") :: Nil)
+  }
+
   def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
     table.snapshotManager().latestSnapshot().commitKind()
   }

Reply via email to