This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new da30f3e9a1 [spark] Fix DELETE with range condition in v2 write (#7104)
da30f3e9a1 is described below

commit da30f3e9a1ea007beb273a0b59624b5f6261f9b2
Author: Kerwin Zhang <[email protected]>
AuthorDate: Fri Jan 23 12:05:23 2026 +0800

    [spark] Fix DELETE with range condition in v2 write (#7104)
---
 .../paimon/spark/rowops/PaimonCopyOnWriteScan.scala  | 20 ++++++++++----------
 .../rowops/PaimonSparkCopyOnWriteOperation.scala     |  3 +--
 .../paimon/spark/sql/DeleteFromTableTestBase.scala   | 13 +++++++++++++
 3 files changed, 24 insertions(+), 12 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
index 033d9eb569..11bd72e360 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
@@ -19,13 +19,13 @@
 package org.apache.paimon.spark.rowops
 
 import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.predicate.{And, CompoundPredicate, Predicate, 
PredicateBuilder}
 import org.apache.paimon.spark.commands.SparkDataFileMeta
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
 import org.apache.paimon.spark.scan.BaseScan
 import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
 import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.source.{DataSplit, Split}
+import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
 
 import org.apache.spark.sql.PaimonUtils
 import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
@@ -39,11 +39,15 @@ import java.nio.file.Paths
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
+/**
+ * Note: The [[pushedPartitionFilters]] and [[pushedDataFilters]] are 
intentionally set to empty
+ * because file-level filtering is handled through Spark's runtime V2 
filtering mechanism.
+ */
 case class PaimonCopyOnWriteScan(
     table: FileStoreTable,
     requiredSchema: StructType,
-    pushedPartitionFilters: Seq[PartitionPredicate],
-    pushedDataFilters: Seq[Predicate])
+    pushedPartitionFilters: Seq[PartitionPredicate] = Seq.empty,
+    pushedDataFilters: Seq[Predicate] = Seq.empty)
   extends BaseScan
   with SupportsRuntimeV2Filtering {
 
@@ -79,13 +83,9 @@ case class PaimonCopyOnWriteScan(
     if (table.coreOptions().manifestDeleteFileDropStats()) {
       snapshotReader.dropStats()
     }
-    if (pushedPartitionFilters.nonEmpty) {
-      
snapshotReader.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
-    }
-    if (pushedDataFilters.nonEmpty) {
-      snapshotReader.withFilter(PredicateBuilder.and(pushedDataFilters.asJava))
-    }
+
     snapshotReader.withDataFileNameFilter(fileName => 
filteredFileNames.contains(fileName))
     dataSplits = snapshotReader.read().splits().asScala.collect { case s: 
DataSplit => s }.toArray
   }
+
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
index e415e5cbf7..e1e2ddd4d9 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
@@ -42,8 +42,7 @@ class PaimonSparkCopyOnWriteOperation(table: FileStoreTable, 
info: RowLevelOpera
         
PaimonSparkCopyOnWriteOperation.this.table.copy(options.asCaseSensitiveMap)
 
       override def build(): Scan = {
-        val scan =
-          PaimonCopyOnWriteScan(table, requiredSchema, pushedPartitionFilters, 
pushedDataFilters)
+        val scan = PaimonCopyOnWriteScan(table, requiredSchema)
         PaimonSparkCopyOnWriteOperation.this.copyOnWriteScan = Option(scan)
         scan
       }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 9daa975cba..d1ae7c331d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -518,4 +518,17 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
       assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
     }
   }
+
+  test("Paimon delete: delete with range condition") {
+    withTable("t") {
+      sql(s"CREATE TABLE t (id INT, v INT)")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id FROM range (1, 
50000)")
+      sql("DELETE FROM t WHERE id >= 111 and id <= 444")
+
+      checkAnswer(
+        sql("SELECT count(*) FROM t"),
+        Row(49665)
+      )
+    }
+  }
 }

Reply via email to