This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/release-1.3 by this push:
     new 0df7a712d7 [spark] Fix sort compact with partition filter (#6371)
0df7a712d7 is described below

commit 0df7a712d7e4f374ca34cb8487053918f679466f
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 9 23:17:46 2025 +0800

    [spark] Fix sort compact with partition filter (#6371)
---
 .../org/apache/paimon/spark/procedure/CompactProcedure.java | 13 +++++++------
 .../paimon/spark/procedure/CompactProcedureTestBase.scala   | 13 +++++++++++++
 2 files changed, 20 insertions(+), 6 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 299fb6f6d7..7788a5a0be 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -263,7 +263,7 @@ public class CompactProcedure extends BaseProcedure {
         }
         boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
         RowType partitionType = table.schema().logicalPartitionType();
-        Predicate filter =
+        Predicate partitionFilter =
                 condition == null
                         ? null
                         : ExpressionUtils.convertConditionToPaimonPredicate(
@@ -273,7 +273,7 @@ public class CompactProcedure extends BaseProcedure {
                                         false)
                                 .getOrElse(null);
         PartitionPredicate partitionPredicate =
-                PartitionPredicate.fromPredicate(partitionType, filter);
+                PartitionPredicate.fromPredicate(partitionType, 
partitionFilter);
 
         if (orderType.equals(OrderType.NONE)) {
             JavaSparkContext javaSparkContext = new 
JavaSparkContext(spark().sparkContext());
@@ -302,7 +302,8 @@ public class CompactProcedure extends BaseProcedure {
         } else {
             switch (bucketMode) {
                 case BUCKET_UNAWARE:
-                    sortCompactUnAwareBucketTable(table, orderType, 
sortColumns, relation, filter);
+                    sortCompactUnAwareBucketTable(
+                            table, orderType, sortColumns, relation, 
partitionFilter);
                     break;
                 default:
                     throw new UnsupportedOperationException(
@@ -521,10 +522,10 @@ public class CompactProcedure extends BaseProcedure {
             OrderType orderType,
             List<String> sortColumns,
             DataSourceV2Relation relation,
-            @Nullable Predicate filter) {
+            @Nullable Predicate partitionFilter) {
         SnapshotReader snapshotReader = table.newSnapshotReader();
-        if (filter != null) {
-            snapshotReader.withFilter(filter);
+        if (partitionFilter != null) {
+            snapshotReader.withPartitionFilter(partitionFilter);
         }
         Map<BinaryRow, DataSplit[]> packedSplits = 
packForSort(snapshotReader.read().dataSplits());
         TableSorter sorter = TableSorter.getSorter(table, orderType, 
sortColumns);
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 96f85ba757..5f653695d0 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -343,6 +343,19 @@ abstract class CompactProcedureTestBase extends 
PaimonSparkTestBase with StreamT
     }
   }
 
+  test("Paimon Procedure: sort compact with partition filter") {
+    withTable("t") {
+      sql("CREATE TABLE t (a INT, pt INT) PARTITIONED BY (pt)")
+      sql("INSERT INTO t VALUES (1, 1)")
+      sql("INSERT INTO t VALUES (2, 1)")
+      sql(
+        "CALL sys.compact(table => 't', order_strategy => 'order', where => 
'pt = 1', order_by => 'a')")
+      val table = loadTable("t")
+      
assert(table.latestSnapshot().get().commitKind.equals(CommitKind.OVERWRITE))
+      checkAnswer(sql("SELECT * FROM t ORDER BY a"), Seq(Row(1, 1), Row(2, 1)))
+    }
+  }
+
   test("Paimon Procedure: compact for pk") {
     failAfter(streamingTimeout) {
       withTempDir {

Reply via email to