This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new da30f3e9a1 [spark] Fix DELETE with range condition in v2 write (#7104)
da30f3e9a1 is described below
commit da30f3e9a1ea007beb273a0b59624b5f6261f9b2
Author: Kerwin Zhang <[email protected]>
AuthorDate: Fri Jan 23 12:05:23 2026 +0800
[spark] Fix DELETE with range condition in v2 write (#7104)
---
.../paimon/spark/rowops/PaimonCopyOnWriteScan.scala | 20 ++++++++++----------
.../rowops/PaimonSparkCopyOnWriteOperation.scala | 3 +--
.../paimon/spark/sql/DeleteFromTableTestBase.scala | 13 +++++++++++++
3 files changed, 24 insertions(+), 12 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
index 033d9eb569..11bd72e360 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
@@ -19,13 +19,13 @@
package org.apache.paimon.spark.rowops
import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.predicate.{And, CompoundPredicate, Predicate,
PredicateBuilder}
import org.apache.paimon.spark.commands.SparkDataFileMeta
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
import org.apache.paimon.spark.scan.BaseScan
import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.source.{DataSplit, Split}
+import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
import org.apache.spark.sql.PaimonUtils
import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
@@ -39,11 +39,15 @@ import java.nio.file.Paths
import scala.collection.JavaConverters._
import scala.collection.mutable
+/**
+ * Note: The [[pushedPartitionFilters]] and [[pushedDataFilters]] are
intentionally set to empty
+ * because file-level filtering is handled through Spark's runtime V2
filtering mechanism.
+ */
case class PaimonCopyOnWriteScan(
table: FileStoreTable,
requiredSchema: StructType,
- pushedPartitionFilters: Seq[PartitionPredicate],
- pushedDataFilters: Seq[Predicate])
+ pushedPartitionFilters: Seq[PartitionPredicate] = Seq.empty,
+ pushedDataFilters: Seq[Predicate] = Seq.empty)
extends BaseScan
with SupportsRuntimeV2Filtering {
@@ -79,13 +83,9 @@ case class PaimonCopyOnWriteScan(
if (table.coreOptions().manifestDeleteFileDropStats()) {
snapshotReader.dropStats()
}
- if (pushedPartitionFilters.nonEmpty) {
-
snapshotReader.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
- }
- if (pushedDataFilters.nonEmpty) {
- snapshotReader.withFilter(PredicateBuilder.and(pushedDataFilters.asJava))
- }
+
snapshotReader.withDataFileNameFilter(fileName =>
filteredFileNames.contains(fileName))
dataSplits = snapshotReader.read().splits().asScala.collect { case s:
DataSplit => s }.toArray
}
+
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
index e415e5cbf7..e1e2ddd4d9 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
@@ -42,8 +42,7 @@ class PaimonSparkCopyOnWriteOperation(table: FileStoreTable,
info: RowLevelOpera
PaimonSparkCopyOnWriteOperation.this.table.copy(options.asCaseSensitiveMap)
override def build(): Scan = {
- val scan =
- PaimonCopyOnWriteScan(table, requiredSchema, pushedPartitionFilters,
pushedDataFilters)
+ val scan = PaimonCopyOnWriteScan(table, requiredSchema)
PaimonSparkCopyOnWriteOperation.this.copyOnWriteScan = Option(scan)
scan
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 9daa975cba..d1ae7c331d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -518,4 +518,17 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
}
}
+
+ test("Paimon delete: delete with range condition") {
+ withTable("t") {
+ sql(s"CREATE TABLE t (id INT, v INT)")
+ sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id FROM range (1,
50000)")
+ sql("DELETE FROM t WHERE id >= 111 and id <= 444")
+
+ checkAnswer(
+ sql("SELECT count(*) FROM t"),
+ Row(49665)
+ )
+ }
+ }
}