This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bb6f49dbe [spark] Support DELETE on Paimon append-only table in spark 
V2 write (#6704)
6bb6f49dbe is described below

commit 6bb6f49dbe8ebc3249a1b8ea43da352d3905737b
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed Dec 17 11:04:14 2025 +0800

    [spark] Support DELETE on Paimon append-only table in spark V2 write (#6704)
---
 .../scala/org/apache/paimon/spark/SparkTable.scala |   6 --
 .../scala/org/apache/paimon/spark/SparkTable.scala |   6 --
 .../scala/org/apache/paimon/spark/SparkTable.scala |   6 --
 .../paimon/spark/sql/V2DeleteFromTableTest.scala}  |  18 ++--
 .../paimon/spark/PaimonCopyOnWriteScan.scala       | 104 +++++++++++++++++++
 .../spark/PaimonSparkCopyOnWriteOperation.scala    | 104 +++++++++++++++++++
 ...a => PaimonSparkRowLevelOperationBuilder.scala} |  13 +--
 .../apache/paimon/spark/PaimonSparkTableBase.scala |   2 +-
 .../scala/org/apache/paimon/spark/SparkTable.scala |  12 ++-
 .../catalyst/analysis/PaimonDeleteTable.scala      |  34 ++++++-
 .../paimon/spark/write/BaseV2WriteBuilder.scala    |  10 ++
 .../apache/paimon/spark/write/PaimonV2Write.scala  | 111 +++++++++++++++++++--
 .../paimon/spark/write/PaimonV2WriteBuilder.scala  |  13 ++-
 .../paimon/spark/sql/DeleteFromTableTestBase.scala |  23 +++++
 14 files changed, 414 insertions(+), 48 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/SparkTable.scala
similarity index 85%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to 
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 69f134196f..284426b615 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -22,9 +22,3 @@ import org.apache.paimon.table.Table
 
 /** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
 case class SparkTable(override val table: Table) extends 
PaimonSparkTableBase(table) {}
-
-case class SparkIcebergTable(table: Table) extends BaseTable
-
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/SparkTable.scala
similarity index 85%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to 
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 69f134196f..284426b615 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -22,9 +22,3 @@ import org.apache.paimon.table.Table
 
 /** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
 case class SparkTable(override val table: Table) extends 
PaimonSparkTableBase(table) {}
-
-case class SparkIcebergTable(table: Table) extends BaseTable
-
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/SparkTable.scala
similarity index 85%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to 
paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 69f134196f..284426b615 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -22,9 +22,3 @@ import org.apache.paimon.table.Table
 
 /** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
 case class SparkTable(override val table: Table) extends 
PaimonSparkTableBase(table) {}
-
-case class SparkIcebergTable(table: Table) extends BaseTable
-
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
similarity index 66%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to 
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
index 69f134196f..947fe503b1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
@@ -16,15 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
 
-import org.apache.paimon.table.Table
+import org.apache.spark.SparkConf
 
-/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(override val table: Table) extends 
PaimonSparkTableBase(table) {}
-
-case class SparkIcebergTable(table: Table) extends BaseTable
-
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
+class V2DeleteFromTableTest extends DeleteFromTableTestBase {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.paimon.write.use-v2-write", "true")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
new file mode 100644
index 0000000000..7e45f4be1b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
+import org.apache.paimon.table.{FileStoreTable, InnerTable}
+import org.apache.paimon.table.source.{DataSplit, Split}
+
+import org.apache.spark.sql.PaimonUtils
+import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
+import org.apache.spark.sql.connector.read.{Batch, SupportsRuntimeV2Filtering}
+import org.apache.spark.sql.sources.{Filter, In}
+import org.apache.spark.sql.types.StructType
+
+import java.nio.file.Paths
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class PaimonCopyOnWriteScan(
+    table: InnerTable,
+    requiredSchema: StructType,
+    pushedPartitionFilters: Seq[PartitionPredicate],
+    pushedDataFilters: Seq[Predicate],
+    bucketedScanDisabled: Boolean = false)
+  extends PaimonScanCommon(table, requiredSchema, bucketedScanDisabled)
+  with SupportsRuntimeV2Filtering {
+
+  var filteredLocations: mutable.Set[String] = mutable.Set[String]()
+
+  var filteredFileNames: mutable.Set[String] = mutable.Set[String]()
+
+  var dataSplits: Array[DataSplit] = Array()
+
+  def disableBucketedScan(): PaimonCopyOnWriteScan = {
+    copy(bucketedScanDisabled = true)
+  }
+
+  override def filterAttributes(): Array[NamedReference] = {
+    Array(Expressions.column(FILE_PATH_COLUMN))
+  }
+
+  override def filter(predicates: Array[SparkPredicate]): Unit = {
+    val runtimefilters: Array[Filter] = 
predicates.flatMap(PaimonUtils.filterV2ToV1)
+    for (filter <- runtimefilters) {
+      filter match {
+        case in: In if in.attribute.equalsIgnoreCase(FILE_PATH_COLUMN) =>
+          for (value <- in.values) {
+            val location = value.asInstanceOf[String]
+            filteredLocations.add(location)
+            filteredFileNames.add(Paths.get(location).getFileName.toString)
+          }
+        case _ => logWarning("Unsupported runtime filter")
+      }
+    }
+
+    table match {
+      case fileStoreTable: FileStoreTable =>
+        val snapshotReader = fileStoreTable.newSnapshotReader()
+        if (fileStoreTable.coreOptions().manifestDeleteFileDropStats()) {
+          snapshotReader.dropStats()
+        }
+
+        pushedPartitionFilters.foreach(snapshotReader.withPartitionFilter)
+
+        pushedDataFilters.foreach(snapshotReader.withFilter)
+
+        snapshotReader.withDataFileNameFilter(fileName => 
filteredFileNames.contains(fileName))
+
+        dataSplits =
+          snapshotReader.read().splits().asScala.collect { case s: DataSplit 
=> s }.toArray
+
+      case _ => throw new RuntimeException("Only FileStoreTable support.")
+    }
+
+  }
+
+  override def toBatch: Batch = {
+    PaimonBatch(
+      getInputPartitions(dataSplits.asInstanceOf[Array[Split]]),
+      readBuilder,
+      coreOptions.blobAsDescriptor(),
+      metadataColumns)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
new file mode 100644
index 0000000000..96256468e9
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions.BucketFunctionType
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.catalog.functions.BucketFunction
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
+import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.spark.write.PaimonV2WriteBuilder
+import org.apache.paimon.table.{FileStoreTable, InnerTable, Table}
+import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, 
POSTPONE_MODE}
+
+import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
RowLevelOperation, RowLevelOperationInfo, WriteBuilder}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PaimonSparkCopyOnWriteOperation(table: Table, info: 
RowLevelOperationInfo)
+  extends RowLevelOperation {
+
+  private lazy val coreOptions = new CoreOptions(table.options())
+
+  private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
+
+  private lazy val useV2Write: Boolean = {
+    val v2WriteConfigured = OptionUtils.useV2Write()
+    v2WriteConfigured && supportsV2Write
+  }
+
+  private def supportsV2Write: Boolean = {
+    coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
+      table match {
+        case storeTable: FileStoreTable =>
+          storeTable.bucketMode() match {
+            case HASH_FIXED => BucketFunction.supportsTable(storeTable)
+            case BUCKET_UNAWARE | POSTPONE_MODE => true
+            case _ => false
+          }
+
+        case _ => false
+      }
+    } && coreOptions.clusteringColumns().isEmpty
+  }
+
+  override def command(): RowLevelOperation.Command = info.command()
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    table match {
+      case t: InnerTable =>
+        new 
PaimonScanBuilder(t.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable]) {
+          override def build(): Scan = {
+            val scan =
+              PaimonCopyOnWriteScan(t, requiredSchema, pushedPartitionFilters, 
pushedDataFilters)
+            PaimonSparkCopyOnWriteOperation.this.copyOnWriteScan = Option(scan)
+            scan
+          }
+        }
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Scan is only supported for InnerTable. " +
+            s"Actual table type: 
${Option(table).map(_.getClass.getSimpleName).getOrElse("null")}"
+        )
+    }
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    table match {
+      case fileStoreTable: FileStoreTable if useV2Write =>
+        val options = Options.fromMap(info.options)
+        val builder = new PaimonV2WriteBuilder(fileStoreTable, info.schema(), 
options)
+        builder.overwriteFiles(copyOnWriteScan)
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Write operation is only supported for FileStoreTable with V2 write 
enabled. " +
+            s"Actual table type: ${table.getClass.getSimpleName}, useV2Write: 
$useV2Write"
+        )
+    }
+  }
+
+  override def requiredMetadataAttributes(): Array[NamedReference] = {
+    val attributes: Seq[NamedReference] = Seq.empty
+    val updatedAttributes = attributes :+ Expressions.column(FILE_PATH_COLUMN)
+    updatedAttributes.toArray
+  }
+
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
similarity index 71%
copy from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
index 69f134196f..57f805cb13 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
@@ -20,11 +20,12 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.table.Table
 
-/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(override val table: Table) extends 
PaimonSparkTableBase(table) {}
+import org.apache.spark.sql.connector.write.{RowLevelOperation, 
RowLevelOperationBuilder, RowLevelOperationInfo}
 
-case class SparkIcebergTable(table: Table) extends BaseTable
+class PaimonSparkRowLevelOperationBuilder(table: Table, info: 
RowLevelOperationInfo)
+  extends RowLevelOperationBuilder {
 
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
+  override def build(): RowLevelOperation = {
+    new PaimonSparkCopyOnWriteOperation(table, info)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 859becf546..22758dea0f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -45,7 +45,7 @@ abstract class PaimonSparkTableBase(val table: Table)
 
   lazy val coreOptions = new CoreOptions(table.options())
 
-  private lazy val useV2Write: Boolean = {
+  lazy val useV2Write: Boolean = {
     val v2WriteConfigured = OptionUtils.useV2Write()
     v2WriteConfigured && supportsV2Write
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 69f134196f..f0e8f382fe 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -20,8 +20,18 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.table.Table
 
+import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+import org.apache.spark.sql.connector.write.{RowLevelOperationBuilder, 
RowLevelOperationInfo}
+
 /** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(override val table: Table) extends 
PaimonSparkTableBase(table) {}
+case class SparkTable(override val table: Table)
+  extends PaimonSparkTableBase(table)
+  with SupportsRowLevelOperations {
+  override def newRowLevelOperationBuilder(
+      rowLevelOperationInfo: RowLevelOperationInfo): RowLevelOperationBuilder 
= {
+    new PaimonSparkRowLevelOperationBuilder(table, rowLevelOperationInfo)
+  }
+}
 
 case class SparkIcebergTable(table: Table) extends BaseTable
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 4a895a517d..15781cff90 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.catalyst.analysis
 
+import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
 import org.apache.paimon.table.FileStoreTable
 
@@ -28,11 +29,42 @@ import scala.collection.JavaConverters._
 
 object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
 
+  /**
+   * Determines if DataSourceV2 delete is not supported for the given table. 
DataSourceV2 delete is
+   * not supported in the following scenarios:
+   *   - Spark version is 3.4 or earlier
+   *   - Table does not use V2 write
+   *   - Row tracking is enabled
+   *   - Deletion vectors are enabled
+   *   - Table has primary keys defined
+   *   - Table is not a FileStoreTable
+   *   - Data evolution is enabled
+   */
+  private def shouldFallbackToV1Delete(table: SparkTable): Boolean = {
+    val baseTable = table.getTable
+
+    val baseConditions = org.apache.spark.SPARK_VERSION <= "3.4" ||
+      !table.useV2Write ||
+      table.coreOptions.rowTrackingEnabled() ||
+      table.coreOptions.deletionVectorsEnabled() ||
+      !baseTable.primaryKeys().isEmpty
+
+    baseConditions || {
+      baseTable match {
+        case paimonTable: FileStoreTable =>
+          paimonTable.coreOptions().dataEvolutionEnabled()
+        case _ =>
+          true
+      }
+    }
+  }
+
   override val operation: RowLevelOp = Delete
 
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.resolveOperators {
-      case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved 
=>
+      case d @ DeleteFromTable(PaimonRelation(table), condition)
+          if d.resolved && shouldFallbackToV1Delete(table) =>
         checkPaimonTable(table.getTable)
 
         table.getTable match {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
index 977e4510cb..17c8e9800f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.spark.write
 
+import org.apache.paimon.spark.PaimonCopyOnWriteScan
 import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, 
SupportsOverwrite, WriteBuilder}
@@ -32,6 +33,15 @@ abstract class BaseV2WriteBuilder(table: Table)
   protected var overwriteDynamic = false
   protected var overwritePartitions: Option[Map[String, String]] = None
 
+  protected var isOverwriteFiles = false
+  protected var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
+
+  def overwriteFiles(scan: Option[PaimonCopyOnWriteScan]): WriteBuilder = {
+    this.isOverwriteFiles = true
+    this.copyOnWriteScan = scan
+    this
+  }
+
   override def overwrite(filters: Array[Filter]): WriteBuilder = {
     if (overwriteDynamic) {
       throw new IllegalArgumentException("Cannot overwrite dynamically and by 
filter both")
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 79c2d5c8fc..ba89e84e2e 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -19,13 +19,16 @@
 package org.apache.paimon.spark.write
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark._
 import org.apache.paimon.spark.catalyst.Compatibility
-import org.apache.paimon.spark.commands.SchemaHelper
+import org.apache.paimon.spark.commands.{SchemaHelper, SparkDataFileMeta}
+import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
 import org.apache.paimon.spark.metric.SparkMetricRegistry
 import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
TableWriteImpl}
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageImpl, TableWriteImpl}
+import org.apache.paimon.table.source.DataSplit
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.PaimonSparkSession
@@ -38,6 +41,8 @@ import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.StructType
 
+import java.util.Collections
+
 import scala.collection.JavaConverters._
 
 class PaimonV2Write(
@@ -55,8 +60,18 @@ class PaimonV2Write(
     !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
     "Cannot overwrite dynamically and by filter both")
 
+  private var isOverwriteFiles = false
+
+  private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
+
   private val writeSchema = mergeSchema(dataSchema, options)
 
+  def overwriteFiles(scan: Option[PaimonCopyOnWriteScan]): PaimonV2Write = {
+    this.isOverwriteFiles = true
+    this.copyOnWriteScan = scan
+    this
+  }
+
   updateTableWithOptions(
     Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key -> 
overwriteDynamic.toString))
 
@@ -74,8 +89,13 @@ class PaimonV2Write(
     ordering
   }
 
-  override def toBatch: BatchWrite =
-    PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions)
+  override def toBatch: BatchWrite = {
+    if (isOverwriteFiles) {
+      CopyOnWriteBatchWrite(table, writeSchema, dataSchema, 
overwritePartitions, copyOnWriteScan)
+    } else {
+      PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions)
+    }
+  }
 
   override def supportedCustomMetrics(): Array[CustomMetric] = {
     Array(
@@ -113,12 +133,19 @@ private case class PaimonBatchWrite(
     writeSchema: StructType,
     dataSchema: StructType,
     overwritePartitions: Option[Map[String, String]])
+  extends PaimonBatchWriteBase(table, writeSchema, dataSchema, 
overwritePartitions) {}
+
+abstract class PaimonBatchWriteBase(
+    table: FileStoreTable,
+    writeSchema: StructType,
+    dataSchema: StructType,
+    overwritePartitions: Option[Map[String, String]])
   extends BatchWrite
   with WriteHelper {
 
-  private val metricRegistry = SparkMetricRegistry()
+  protected val metricRegistry = SparkMetricRegistry()
 
-  private val batchWriteBuilder = {
+  protected val batchWriteBuilder = {
     val builder = table.newBatchWriteBuilder()
     overwritePartitions.foreach(partitions => 
builder.withOverwrite(partitions.asJava))
     builder
@@ -154,7 +181,7 @@ private case class PaimonBatchWrite(
 
   // Spark support v2 write driver metrics since 4.0, see 
https://github.com/apache/spark/pull/48573
   // To ensure compatibility with 3.x, manually post driver metrics here 
instead of using Spark's API.
-  private def postDriverMetrics(): Unit = {
+  protected def postDriverMetrics(): Unit = {
     val spark = PaimonSparkSession.active
     // todo: find a more suitable way to get metrics.
     val commitMetrics = metricRegistry.buildSparkCommitMetrics()
@@ -171,6 +198,76 @@ private case class PaimonBatchWrite(
   }
 }
 
+private case class CopyOnWriteBatchWrite(
+    table: FileStoreTable,
+    writeSchema: StructType,
+    dataSchema: StructType,
+    overwritePartitions: Option[Map[String, String]],
+    scan: Option[PaimonCopyOnWriteScan])
+  extends PaimonBatchWriteBase(table, writeSchema, dataSchema, 
overwritePartitions) {
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"CopyOnWrite committing to table ${table.name()}")
+
+    val batchTableCommit = batchWriteBuilder.newCommit()
+
+    try {
+      if (scan.isEmpty) {
+        batchTableCommit.truncateTable()
+      } else {
+        val touchedFiles = candidateFiles(scan.get.dataSplits)
+
+        val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+
+        val addCommitMessages = WriteTaskResult.merge(messages)
+
+        val commitMessages = addCommitMessages ++ deletedCommitMessage
+
+        batchTableCommit.withMetricRegistry(metricRegistry)
+        val start = System.currentTimeMillis()
+        batchTableCommit.commit(commitMessages.asJava)
+        logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+        postCommit(commitMessages)
+      }
+    } finally {
+      batchTableCommit.close()
+      postDriverMetrics()
+    }
+  }
+
+  private def candidateFiles(candidateDataSplits: Seq[DataSplit]): 
Array[SparkDataFileMeta] = {
+    val totalBuckets = coreOptions.bucket()
+    candidateDataSplits
+      .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, 
totalBuckets))
+      .toArray
+  }
+
+  private def buildDeletedCommitMessage(
+      deletedFiles: Array[SparkDataFileMeta]): Seq[CommitMessage] = {
+    deletedFiles
+      .groupBy(f => (f.partition, f.bucket))
+      .map {
+        case ((partition, bucket), files) =>
+          val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
+
+          new CommitMessageImpl(
+            partition,
+            bucket,
+            files.head.totalBuckets,
+            new DataIncrement(
+              Collections.emptyList[DataFileMeta],
+              deletedDataFileMetas,
+              Collections.emptyList[DataFileMeta]),
+            new CompactIncrement(
+              Collections.emptyList[DataFileMeta],
+              Collections.emptyList[DataFileMeta],
+              Collections.emptyList[DataFileMeta])
+          )
+      }
+      .toSeq
+  }
+}
+
 private case class WriterFactory(
     writeSchema: StructType,
     dataSchema: StructType,
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
index b11ca13920..f8036ff40a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
@@ -22,15 +22,20 @@ import org.apache.paimon.options.Options
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.types.RowType
 
-import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, 
SupportsOverwrite, WriteBuilder}
-import org.apache.spark.sql.sources.{And, Filter}
 import org.apache.spark.sql.types.StructType
 
 class PaimonV2WriteBuilder(table: FileStoreTable, dataSchema: StructType, 
options: Options)
   extends BaseV2WriteBuilder(table) {
 
-  override def build =
-    new PaimonV2Write(table, overwriteDynamic, overwritePartitions, 
dataSchema, options)
+  override def build = {
+    val paimonV2Write =
+      new PaimonV2Write(table, overwriteDynamic, overwritePartitions, 
dataSchema, options)
+    if (isOverwriteFiles) {
+      paimonV2Write.overwriteFiles(copyOnWriteScan)
+    } else {
+      paimonV2Write
+    }
+  }
 
   override def partitionRowType(): RowType = 
table.schema().logicalPartitionType()
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index ea05d94dd3..3113b905f8 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -52,6 +52,29 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
     )
   }
 
+  test(s"Paimon Delete: append-only table, no match and full delete 
scenarios") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING)
+                 |""".stripMargin)
+
+    spark.sql("""
+                |INSERT INTO T
+                |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), 
(4, 'd', '2025')
+                |""".stripMargin)
+
+    spark.sql("DELETE FROM T WHERE name = 'e'")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", 
"2025")).toDF()
+    )
+
+    spark.sql("DELETE FROM T")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      spark.emptyDataFrame
+    )
+  }
+
   test(s"Paimon Delete: append-only table with partition") {
     spark.sql(s"""
                  |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED 
BY (dt)

Reply via email to