This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/release-1.3 by this push:
new 0df7a712d7 [spark] Fix sort compact with partition filter (#6371)
0df7a712d7 is described below
commit 0df7a712d7e4f374ca34cb8487053918f679466f
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 9 23:17:46 2025 +0800
[spark] Fix sort compact with partition filter (#6371)
---
.../org/apache/paimon/spark/procedure/CompactProcedure.java | 13 +++++++------
.../paimon/spark/procedure/CompactProcedureTestBase.scala | 13 +++++++++++++
2 files changed, 20 insertions(+), 6 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 299fb6f6d7..7788a5a0be 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -263,7 +263,7 @@ public class CompactProcedure extends BaseProcedure {
}
boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
RowType partitionType = table.schema().logicalPartitionType();
- Predicate filter =
+ Predicate partitionFilter =
condition == null
? null
: ExpressionUtils.convertConditionToPaimonPredicate(
@@ -273,7 +273,7 @@ public class CompactProcedure extends BaseProcedure {
false)
.getOrElse(null);
PartitionPredicate partitionPredicate =
- PartitionPredicate.fromPredicate(partitionType, filter);
+ PartitionPredicate.fromPredicate(partitionType,
partitionFilter);
if (orderType.equals(OrderType.NONE)) {
JavaSparkContext javaSparkContext = new
JavaSparkContext(spark().sparkContext());
@@ -302,7 +302,8 @@ public class CompactProcedure extends BaseProcedure {
} else {
switch (bucketMode) {
case BUCKET_UNAWARE:
- sortCompactUnAwareBucketTable(table, orderType,
sortColumns, relation, filter);
+ sortCompactUnAwareBucketTable(
+ table, orderType, sortColumns, relation,
partitionFilter);
break;
default:
throw new UnsupportedOperationException(
@@ -521,10 +522,10 @@ public class CompactProcedure extends BaseProcedure {
OrderType orderType,
List<String> sortColumns,
DataSourceV2Relation relation,
- @Nullable Predicate filter) {
+ @Nullable Predicate partitionFilter) {
SnapshotReader snapshotReader = table.newSnapshotReader();
- if (filter != null) {
- snapshotReader.withFilter(filter);
+ if (partitionFilter != null) {
+ snapshotReader.withPartitionFilter(partitionFilter);
}
Map<BinaryRow, DataSplit[]> packedSplits =
packForSort(snapshotReader.read().dataSplits());
TableSorter sorter = TableSorter.getSorter(table, orderType,
sortColumns);
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
index 96f85ba757..5f653695d0 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala
@@ -343,6 +343,19 @@ abstract class CompactProcedureTestBase extends
PaimonSparkTestBase with StreamT
}
}
+ test("Paimon Procedure: sort compact with partition filter") {
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, pt INT) PARTITIONED BY (pt)")
+ sql("INSERT INTO t VALUES (1, 1)")
+ sql("INSERT INTO t VALUES (2, 1)")
+ sql(
+ "CALL sys.compact(table => 't', order_strategy => 'order', where =>
'pt = 1', order_by => 'a')")
+ val table = loadTable("t")
+
assert(table.latestSnapshot().get().commitKind.equals(CommitKind.OVERWRITE))
+ checkAnswer(sql("SELECT * FROM t ORDER BY a"), Seq(Row(1, 1), Row(2, 1)))
+ }
+ }
+
test("Paimon Procedure: compact for pk") {
failAfter(streamingTimeout) {
withTempDir {