This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 87bafe8071 [spark] Add partition filter for incremental clustering 
procedure (#6640)
87bafe8071 is described below

commit 87bafe8071b24394f9ec466bf637a78695d85d96
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Nov 20 16:05:55 2025 +0800

    [spark] Add partition filter for incremental clustering procedure (#6640)
---
 .../paimon/spark/procedure/CompactProcedure.java   | 11 +++++--
 .../spark/procedure/CompactProcedureTestBase.scala | 34 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 3 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 93305a2aec..f7ae50c3b1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -268,7 +268,8 @@ public class CompactProcedure extends BaseProcedure {
                     break;
                 case BUCKET_UNAWARE:
                     if (clusterIncrementalEnabled) {
-                        clusterIncrementalUnAwareBucketTable(table, 
fullCompact, relation);
+                        clusterIncrementalUnAwareBucketTable(
+                                table, partitionPredicate, fullCompact, 
relation);
                     } else {
                         compactUnAwareBucketTable(
                                 table, partitionPredicate, partitionIdleTime, 
javaSparkContext);
@@ -530,8 +531,12 @@ public class CompactProcedure extends BaseProcedure {
     }
 
     private void clusterIncrementalUnAwareBucketTable(
-            FileStoreTable table, boolean fullCompaction, DataSourceV2Relation 
relation) {
-        IncrementalClusterManager incrementalClusterManager = new 
IncrementalClusterManager(table);
+            FileStoreTable table,
+            @Nullable PartitionPredicate partitionPredicate,
+            boolean fullCompaction,
+            DataSourceV2Relation relation) {
+        IncrementalClusterManager incrementalClusterManager =
+                new IncrementalClusterManager(table, partitionPredicate);
         Map<BinaryRow, CompactUnit> compactUnits =
                 incrementalClusterManager.createCompactUnits(fullCompaction);
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 81e475e983..e89eba2e85 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -1139,6 +1139,40 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     }
   }
 
+  test("Paimon Procedure: cluster for partitioned table with partition 
filter") {
+    sql(
+      """
+        |CREATE TABLE T (a INT, b INT, pt INT)
+        |PARTITIONED BY (pt)
+        |TBLPROPERTIES (
+        |  'bucket'='-1', 'num-levels'='6', 
'num-sorted-run.compaction-trigger'='2',
+        |  'clustering.columns'='a,b', 'clustering.strategy'='zorder', 
'clustering.incremental' = 'true'
+        |)
+        |""".stripMargin)
+
+    sql("INSERT INTO T VALUES (0, 0, 0), (0, 0, 1)")
+    sql("INSERT INTO T VALUES (0, 1, 0), (0, 1, 1)")
+    sql("INSERT INTO T VALUES (0, 2, 0), (0, 2, 1)")
+    sql("INSERT INTO T VALUES (1, 0, 0), (1, 0, 1)")
+    sql("INSERT INTO T VALUES (1, 1, 0), (1, 1, 1)")
+    sql("INSERT INTO T VALUES (1, 2, 0), (1, 2, 1)")
+    sql("INSERT INTO T VALUES (2, 0, 0), (2, 0, 1)")
+    sql("INSERT INTO T VALUES (2, 1, 0), (2, 1, 1)")
+    sql("INSERT INTO T VALUES (2, 2, 0), (2, 2, 1)")
+
+    sql("CALL sys.compact(table => 'T', where => 'pt = 0')")
+    checkAnswer(
+      sql("select distinct partition, level from `T$files` order by 
partition"),
+      Seq(Row("{0}", 5), Row("{1}", 0))
+    )
+
+    sql("CALL sys.compact(table => 'T', where => 'pt = 1')")
+    checkAnswer(
+      sql("select distinct partition, level from `T$files` order by 
partition"),
+      Seq(Row("{0}", 5), Row("{1}", 5))
+    )
+  }
+
   test("Paimon Procedure: cluster with deletion vectors") {
     failAfter(Span(5, org.scalatest.time.Minutes)) {
       withTempDir {

Reply via email to