This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a5c41c92f [spark] CompactProcedure add options parameter (#3672)
a5c41c92f is described below
commit a5c41c92f3fd01571764d255c1e630e9cac70b4f
Author: askwang <[email protected]>
AuthorDate: Fri Jul 5 13:41:20 2024 +0800
[spark] CompactProcedure add options parameter (#3672)
---
.../paimon/spark/procedure/CompactProcedure.java | 14 +++++++--
.../spark/procedure/CompactProcedureTestBase.scala | 33 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 2 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index e21702bab..eaf7e3a00 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -74,6 +74,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -110,7 +111,8 @@ public class CompactProcedure extends BaseProcedure {
ProcedureParameter.optional("order_strategy", StringType),
ProcedureParameter.optional("order_by", StringType),
ProcedureParameter.optional("where", StringType),
- ProcedureParameter.optional("max_concurrent_jobs", IntegerType)
+ ProcedureParameter.optional("max_concurrent_jobs",
IntegerType),
+ ProcedureParameter.optional("options", StringType),
};
private static final StructType OUTPUT_TYPE =
@@ -144,6 +146,7 @@ public class CompactProcedure extends BaseProcedure {
: Arrays.asList(args.getString(3).split(","));
String where = blank(args, 4) ? null : args.getString(4);
int maxConcurrentJobs = args.isNullAt(5) ? 15 : args.getInt(5);
+ String options = args.isNullAt(6) ? null : args.getString(6);
if (TableSorter.OrderType.NONE.name().equals(sortType) &&
!sortColumns.isEmpty()) {
throw new IllegalArgumentException(
"order_strategy \"none\" cannot work with order_by
columns.");
@@ -174,6 +177,14 @@ public class CompactProcedure extends BaseProcedure {
condition,
table.partitionKeys());
}
+
+ Map<String, String> dynamicOptions = new HashMap<>();
+ dynamicOptions.put(CoreOptions.WRITE_ONLY.key(), "false");
+ if (!StringUtils.isBlank(options)) {
+
dynamicOptions.putAll(ParameterUtils.parseCommaSeparatedKeyValues(options));
+ }
+ table = table.copy(dynamicOptions);
+
InternalRow internalRow =
newInternalRow(
execute(
@@ -203,7 +214,6 @@ public class CompactProcedure extends BaseProcedure {
DataSourceV2Relation relation,
@Nullable Expression condition,
int maxConcurrentJobs) {
- table =
table.copy(Collections.singletonMap(CoreOptions.WRITE_ONLY.key(), "false"));
BucketMode bucketMode = table.bucketMode();
TableSorter.OrderType orderType = TableSorter.OrderType.of(sortType);
Predicate filter =
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 245076f7b..73d6fc442 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -528,6 +528,39 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
Assertions.assertThat(where).isEqualTo(whereExpected)
}
+ test("Paimon Procedure: compact unaware bucket append table with option") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, value STRING, pt STRING)
+ |TBLPROPERTIES ('bucket'='-1', 'write-only'='true')
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+
+ val table = loadTable("T")
+
+ spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p2')")
+ spark.sql(s"INSERT INTO T VALUES (3, 'c', 'p1'), (4, 'd', 'p2')")
+ spark.sql(s"INSERT INTO T VALUES (5, 'e', 'p1'), (6, 'f', 'p2')")
+
+ spark.sql(
+ "CALL sys.compact(table => 'T', partitions => 'pt=\"p1\"', options =>
'compaction.min.file-num=2,compaction.max.file-num = 3')")
+
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+ Assertions.assertThat(lastSnapshotId(table)).isEqualTo(4)
+
+ spark.sql(
+ "CALL sys.compact(table => 'T', options =>
'compaction.min.file-num=2,compaction.max.file-num = 3')")
+
Assertions.assertThat(lastSnapshotCommand(table).equals(CommitKind.COMPACT)).isTrue
+ Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5)
+
+ // compact condition no longer met
+ spark.sql(s"CALL sys.compact(table => 'T')")
+ Assertions.assertThat(lastSnapshotId(table)).isEqualTo(5)
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM T ORDER BY id"),
+ Row(1, "a", "p1") :: Row(2, "b", "p2") :: Row(3, "c", "p1") :: Row(4,
"d", "p2") ::
+ Row(5, "e", "p1") :: Row(6, "f", "p2") :: Nil)
+ }
+
def lastSnapshotCommand(table: FileStoreTable): CommitKind = {
table.snapshotManager().latestSnapshot().commitKind()
}