This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 87bafe8071 [spark] Add partition filter for incremental clustering
procedure (#6640)
87bafe8071 is described below
commit 87bafe8071b24394f9ec466bf637a78695d85d96
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Nov 20 16:05:55 2025 +0800
[spark] Add partition filter for incremental clustering procedure (#6640)
---
.../paimon/spark/procedure/CompactProcedure.java | 11 +++++--
.../spark/procedure/CompactProcedureTestBase.scala | 34 ++++++++++++++++++++++
2 files changed, 42 insertions(+), 3 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 93305a2aec..f7ae50c3b1 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -268,7 +268,8 @@ public class CompactProcedure extends BaseProcedure {
break;
case BUCKET_UNAWARE:
if (clusterIncrementalEnabled) {
- clusterIncrementalUnAwareBucketTable(table,
fullCompact, relation);
+ clusterIncrementalUnAwareBucketTable(
+ table, partitionPredicate, fullCompact,
relation);
} else {
compactUnAwareBucketTable(
table, partitionPredicate, partitionIdleTime,
javaSparkContext);
@@ -530,8 +531,12 @@ public class CompactProcedure extends BaseProcedure {
}
private void clusterIncrementalUnAwareBucketTable(
- FileStoreTable table, boolean fullCompaction, DataSourceV2Relation
relation) {
- IncrementalClusterManager incrementalClusterManager = new
IncrementalClusterManager(table);
+ FileStoreTable table,
+ @Nullable PartitionPredicate partitionPredicate,
+ boolean fullCompaction,
+ DataSourceV2Relation relation) {
+ IncrementalClusterManager incrementalClusterManager =
+ new IncrementalClusterManager(table, partitionPredicate);
Map<BinaryRow, CompactUnit> compactUnits =
incrementalClusterManager.createCompactUnits(fullCompaction);
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 81e475e983..e89eba2e85 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -1139,6 +1139,40 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
}
}
+ test("Paimon Procedure: cluster for partitioned table with partition
filter") {
+ sql(
+ """
+ |CREATE TABLE T (a INT, b INT, pt INT)
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES (
+ | 'bucket'='-1', 'num-levels'='6',
'num-sorted-run.compaction-trigger'='2',
+ | 'clustering.columns'='a,b', 'clustering.strategy'='zorder',
'clustering.incremental' = 'true'
+ |)
+ |""".stripMargin)
+
+ sql("INSERT INTO T VALUES (0, 0, 0), (0, 0, 1)")
+ sql("INSERT INTO T VALUES (0, 1, 0), (0, 1, 1)")
+ sql("INSERT INTO T VALUES (0, 2, 0), (0, 2, 1)")
+ sql("INSERT INTO T VALUES (1, 0, 0), (1, 0, 1)")
+ sql("INSERT INTO T VALUES (1, 1, 0), (1, 1, 1)")
+ sql("INSERT INTO T VALUES (1, 2, 0), (1, 2, 1)")
+ sql("INSERT INTO T VALUES (2, 0, 0), (2, 0, 1)")
+ sql("INSERT INTO T VALUES (2, 1, 0), (2, 1, 1)")
+ sql("INSERT INTO T VALUES (2, 2, 0), (2, 2, 1)")
+
+ sql("CALL sys.compact(table => 'T', where => 'pt = 0')")
+ checkAnswer(
+ sql("select distinct partition, level from `T$files` order by
partition"),
+ Seq(Row("{0}", 5), Row("{1}", 0))
+ )
+
+ sql("CALL sys.compact(table => 'T', where => 'pt = 1')")
+ checkAnswer(
+ sql("select distinct partition, level from `T$files` order by
partition"),
+ Seq(Row("{0}", 5), Row("{1}", 5))
+ )
+ }
+
test("Paimon Procedure: cluster with deletion vectors") {
failAfter(Span(5, org.scalatest.time.Minutes)) {
withTempDir {