This is an automated email from the ASF dual-hosted git repository.

zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7d862898e0 [spark] Refactor spark v2 DELETE (#6851)
7d862898e0 is described below

commit 7d862898e0cee2efea52ab6e90f0b7c88b9d327b
Author: Zouxxyy <[email protected]>
AuthorDate: Sun Dec 21 15:38:45 2025 +0800

    [spark] Refactor spark v2 DELETE (#6851)
---
 .../generated/spark_connector_configuration.html   |   2 +-
 .../paimon/spark/sql/DeleteFromTableTest.scala     |   8 +
 .../paimon/spark/sql/V2DeleteFromTableTest.scala   |  28 ---
 .../paimon/spark/sql/DeleteFromTableTest.scala     |   8 +
 .../apache/paimon/spark/SparkConnectorOptions.java |   2 +-
 .../spark/PaimonSparkCopyOnWriteOperation.scala    | 105 ---------
 .../PaimonSparkRowLevelOperationBuilder.scala      |  31 ---
 .../scala/org/apache/paimon/spark/SparkTable.scala |  15 +-
 .../catalyst/analysis/PaimonDeleteTable.scala      |  44 ++--
 .../paimon/spark/format/PaimonFormatTable.scala    |  24 +-
 .../{scan => rowops}/PaimonCopyOnWriteScan.scala   |  45 ++--
 .../rowops/PaimonSparkCopyOnWriteOperation.scala   |  63 ++++++
 .../paimon/spark/write/BaseV2WriteBuilder.scala    |  43 ++--
 .../paimon/spark/write/PaimonBatchWrite.scala      | 133 +++++++++++
 .../paimon/spark/write/PaimonV2DataWriter.scala    |  80 +++++++
 .../apache/paimon/spark/write/PaimonV2Write.scala  | 252 +--------------------
 .../paimon/spark/write/PaimonV2WriteBuilder.scala  |  16 +-
 .../paimon/spark/write/PaimonWriteBuilder.scala    |   1 -
 .../paimon/spark/sql/DeleteFromTableTestBase.scala |  24 ++
 19 files changed, 406 insertions(+), 518 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 6f16a972c2..24baa2431f 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -96,7 +96,7 @@ under the License.
             <td><h5>write.use-v2-write</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>If true, v2 write will be used. Currently, only HASH_FIXED and 
BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other 
bucket modes. Currently, Spark V2 write does not support 
TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.</td>
+            <td>If true, v2 write will be used. Currently, only HASH_FIXED and 
BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other 
bucket modes. Currently, Spark V2 write does not support 
TableCapability.STREAMING_WRITE.</td>
         </tr>
     </tbody>
 </table>
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 09554a1dbf..ab33a40e59 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -18,4 +18,12 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.spark.SparkConf
+
 class DeleteFromTableTest extends DeleteFromTableTestBase {}
+
+class V2DeleteFromTableTest extends DeleteFromTableTestBase {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
deleted file mode 100644
index 947fe503b1..0000000000
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.sql
-
-import org.apache.spark.SparkConf
-
-class V2DeleteFromTableTest extends DeleteFromTableTestBase {
-  override protected def sparkConf: SparkConf = {
-    super.sparkConf
-      .set("spark.paimon.write.use-v2-write", "true")
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
index 09554a1dbf..ab33a40e59 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTest.scala
@@ -18,4 +18,12 @@
 
 package org.apache.paimon.spark.sql
 
+import org.apache.spark.SparkConf
+
 class DeleteFromTableTest extends DeleteFromTableTestBase {}
+
+class V2DeleteFromTableTest extends DeleteFromTableTestBase {
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.paimon.write.use-v2-write", "true")
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 05630caceb..13305637ee 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -51,7 +51,7 @@ public class SparkConnectorOptions {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "If true, v2 write will be used. Currently, only 
HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 
write for other bucket modes. Currently, Spark V2 write does not support 
TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.");
+                            "If true, v2 write will be used. Currently, only 
HASH_FIXED and BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 
write for other bucket modes. Currently, Spark V2 write does not support 
TableCapability.STREAMING_WRITE.");
 
     public static final ConfigOption<Integer> MAX_FILES_PER_TRIGGER =
             key("read.stream.maxFilesPerTrigger")
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
deleted file mode 100644
index b939ba8ef5..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.CoreOptions.BucketFunctionType
-import org.apache.paimon.options.Options
-import org.apache.paimon.spark.catalog.functions.BucketFunction
-import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
-import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
-import org.apache.paimon.spark.util.OptionUtils
-import org.apache.paimon.spark.write.PaimonV2WriteBuilder
-import org.apache.paimon.table.{FileStoreTable, InnerTable, Table}
-import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, 
POSTPONE_MODE}
-
-import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
-import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
RowLevelOperation, RowLevelOperationInfo, WriteBuilder}
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
-
-class PaimonSparkCopyOnWriteOperation(table: Table, info: 
RowLevelOperationInfo)
-  extends RowLevelOperation {
-
-  private lazy val coreOptions = new CoreOptions(table.options())
-
-  private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
-
-  private lazy val useV2Write: Boolean = {
-    val v2WriteConfigured = OptionUtils.useV2Write()
-    v2WriteConfigured && supportsV2Write
-  }
-
-  private def supportsV2Write: Boolean = {
-    coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
-      table match {
-        case storeTable: FileStoreTable =>
-          storeTable.bucketMode() match {
-            case HASH_FIXED => BucketFunction.supportsTable(storeTable)
-            case BUCKET_UNAWARE | POSTPONE_MODE => true
-            case _ => false
-          }
-
-        case _ => false
-      }
-    } && coreOptions.clusteringColumns().isEmpty
-  }
-
-  override def command(): RowLevelOperation.Command = info.command()
-
-  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
-    table match {
-      case t: InnerTable =>
-        new 
PaimonScanBuilder(t.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable]) {
-          override def build(): Scan = {
-            val scan =
-              PaimonCopyOnWriteScan(t, requiredSchema, pushedPartitionFilters, 
pushedDataFilters)
-            PaimonSparkCopyOnWriteOperation.this.copyOnWriteScan = Option(scan)
-            scan
-          }
-        }
-      case _ =>
-        throw new UnsupportedOperationException(
-          s"Scan is only supported for InnerTable. " +
-            s"Actual table type: 
${Option(table).map(_.getClass.getSimpleName).getOrElse("null")}"
-        )
-    }
-  }
-
-  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
-    table match {
-      case fileStoreTable: FileStoreTable if useV2Write =>
-        val options = Options.fromMap(info.options)
-        val builder = new PaimonV2WriteBuilder(fileStoreTable, info.schema(), 
options)
-        builder.overwriteFiles(copyOnWriteScan)
-      case _ =>
-        throw new UnsupportedOperationException(
-          s"Write operation is only supported for FileStoreTable with V2 write 
enabled. " +
-            s"Actual table type: ${table.getClass.getSimpleName}, useV2Write: 
$useV2Write"
-        )
-    }
-  }
-
-  override def requiredMetadataAttributes(): Array[NamedReference] = {
-    val attributes: Seq[NamedReference] = Seq.empty
-    val updatedAttributes = attributes :+ Expressions.column(FILE_PATH_COLUMN)
-    updatedAttributes.toArray
-  }
-
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
deleted file mode 100644
index 57f805cb13..0000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.table.Table
-
-import org.apache.spark.sql.connector.write.{RowLevelOperation, 
RowLevelOperationBuilder, RowLevelOperationInfo}
-
-class PaimonSparkRowLevelOperationBuilder(table: Table, info: 
RowLevelOperationInfo)
-  extends RowLevelOperationBuilder {
-
-  override def build(): RowLevelOperation = {
-    new PaimonSparkCopyOnWriteOperation(table, info)
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index f0e8f382fe..740e7b5994 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -18,7 +18,8 @@
 
 package org.apache.paimon.spark
 
-import org.apache.paimon.table.Table
+import org.apache.paimon.spark.rowops.PaimonSparkCopyOnWriteOperation
+import org.apache.paimon.table.{FileStoreTable, Table}
 
 import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
 import org.apache.spark.sql.connector.write.{RowLevelOperationBuilder, 
RowLevelOperationInfo}
@@ -27,9 +28,17 @@ import 
org.apache.spark.sql.connector.write.{RowLevelOperationBuilder, RowLevelO
 case class SparkTable(override val table: Table)
   extends PaimonSparkTableBase(table)
   with SupportsRowLevelOperations {
+
   override def newRowLevelOperationBuilder(
-      rowLevelOperationInfo: RowLevelOperationInfo): RowLevelOperationBuilder 
= {
-    new PaimonSparkRowLevelOperationBuilder(table, rowLevelOperationInfo)
+      info: RowLevelOperationInfo): RowLevelOperationBuilder = {
+    table match {
+      case t: FileStoreTable if useV2Write =>
+        () => new PaimonSparkCopyOnWriteOperation(t, info)
+      case _ =>
+        throw new UnsupportedOperationException(
+          s"Write operation is only supported for FileStoreTable with V2 write 
enabled. " +
+            s"Actual table type: ${table.getClass.getSimpleName}, useV2Write: 
$useV2Write")
+    }
   }
 }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 15781cff90..46b4cc05b4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -22,41 +22,25 @@ import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
 import org.apache.paimon.table.FileStoreTable
 
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.Rule
 
-import scala.collection.JavaConverters._
-
 object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
 
-  /**
-   * Determines if DataSourceV2 delete is not supported for the given table. 
DataSourceV2 delete is
-   * not supported in the following scenarios:
-   *   - Spark version is 3.4 or earlier
-   *   - Table does not use V2 write
-   *   - Row tracking is enabled
-   *   - Deletion vectors are enabled
-   *   - Table has primary keys defined
-   *   - Table is not a FileStoreTable
-   *   - Data evolution is enabled
-   */
-  private def shouldFallbackToV1Delete(table: SparkTable): Boolean = {
+  /** Determines if DataSourceV2 delete is not supported for the given table. 
*/
+  private def shouldFallbackToV1Delete(table: SparkTable, condition: 
Expression): Boolean = {
     val baseTable = table.getTable
-
-    val baseConditions = org.apache.spark.SPARK_VERSION <= "3.4" ||
-      !table.useV2Write ||
-      table.coreOptions.rowTrackingEnabled() ||
-      table.coreOptions.deletionVectorsEnabled() ||
-      !baseTable.primaryKeys().isEmpty
-
-    baseConditions || {
-      baseTable match {
-        case paimonTable: FileStoreTable =>
-          paimonTable.coreOptions().dataEvolutionEnabled()
-        case _ =>
-          true
-      }
-    }
+    org.apache.spark.SPARK_VERSION < "3.5" ||
+    !baseTable.isInstanceOf[FileStoreTable] ||
+    !baseTable.primaryKeys().isEmpty ||
+    !table.useV2Write ||
+    table.coreOptions.deletionVectorsEnabled() ||
+    table.coreOptions.rowTrackingEnabled() ||
+    table.coreOptions.dataEvolutionEnabled() ||
+    // todo: Optimize v2 delete when conditions are all partition filters
+    condition == null || condition == TrueLiteral
   }
 
   override val operation: RowLevelOp = Delete
@@ -64,7 +48,7 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with 
RowLevelHelper {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     plan.resolveOperators {
       case d @ DeleteFromTable(PaimonRelation(table), condition)
-          if d.resolved && shouldFallbackToV1Delete(table) =>
+          if d.resolved && shouldFallbackToV1Delete(table, condition) =>
         checkPaimonTable(table.getTable)
 
         table.getTable match {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
index 1669cd8353..fb53fafa25 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/format/PaimonFormatTable.scala
@@ -92,19 +92,16 @@ case class PaimonFormatTableWriterBuilder(table: 
FormatTable, writeSchema: Struc
 
 private case class FormatTableBatchWrite(
     table: FormatTable,
-    overwriteDynamic: Boolean,
+    overwriteDynamic: Option[Boolean],
     overwritePartitions: Option[Map[String, String]],
     writeSchema: StructType)
   extends BatchWrite
   with Logging {
 
-  assert(
-    !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
-    "Cannot overwrite dynamically and by filter both")
-
   private val batchWriteBuilder = {
     val builder = table.newBatchWriteBuilder()
-    if (overwriteDynamic) {
+    // todo: add test for static overwrite the whole table
+    if (overwriteDynamic.contains(true)) {
       builder.withOverwrite()
     } else {
       overwritePartitions.foreach(partitions => 
builder.withOverwrite(partitions.asJava))
@@ -112,8 +109,9 @@ private case class FormatTableBatchWrite(
     builder
   }
 
-  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
-    FormatTableWriterFactory(batchWriteBuilder, writeSchema)
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
+    (_: Int, _: Long) => new FormatTableDataWriter(batchWriteBuilder, 
writeSchema)
+  }
 
   override def useCommitCoordinator(): Boolean = false
 
@@ -140,16 +138,6 @@ private case class FormatTableBatchWrite(
   }
 }
 
-private case class FormatTableWriterFactory(
-    batchWriteBuilder: BatchWriteBuilder,
-    writeSchema: StructType)
-  extends DataWriterFactory {
-
-  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
-    new FormatTableDataWriter(batchWriteBuilder, writeSchema)
-  }
-}
-
 private class FormatTableDataWriter(batchWriteBuilder: BatchWriteBuilder, 
writeSchema: StructType)
   extends V2DataWrite
   with Logging {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
similarity index 68%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
index 3b4efbde0d..033d9eb569 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonCopyOnWriteScan.scala
@@ -16,12 +16,15 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.spark.scan
+package org.apache.paimon.spark.rowops
 
 import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.spark.commands.SparkDataFileMeta
+import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
+import org.apache.paimon.spark.scan.BaseScan
 import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
-import org.apache.paimon.table.{FileStoreTable, InnerTable}
+import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.source.{DataSplit, Split}
 
 import org.apache.spark.sql.PaimonUtils
@@ -37,7 +40,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 case class PaimonCopyOnWriteScan(
-    table: InnerTable,
+    table: FileStoreTable,
     requiredSchema: StructType,
     pushedPartitionFilters: Seq[PartitionPredicate],
     pushedDataFilters: Seq[Predicate])
@@ -45,15 +48,21 @@ case class PaimonCopyOnWriteScan(
   with SupportsRuntimeV2Filtering {
 
   override def inputSplits: Array[Split] = 
dataSplits.asInstanceOf[Array[Split]]
-  private val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
 
   var dataSplits: Array[DataSplit] = Array()
 
+  def scannedFiles: Seq[SparkDataFileMeta] = {
+    dataSplits
+      .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, 
dataSplit.totalBuckets()))
+      .toSeq
+  }
+
   override def filterAttributes(): Array[NamedReference] = {
     Array(Expressions.column(FILE_PATH_COLUMN))
   }
 
   override def filter(predicates: Array[SparkPredicate]): Unit = {
+    val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
     val runtimefilters: Array[Filter] = 
predicates.flatMap(PaimonUtils.filterV2ToV1)
     for (filter <- runtimefilters) {
       filter match {
@@ -66,23 +75,17 @@ case class PaimonCopyOnWriteScan(
       }
     }
 
-    table match {
-      case fileStoreTable: FileStoreTable =>
-        val snapshotReader = fileStoreTable.newSnapshotReader()
-        if (fileStoreTable.coreOptions().manifestDeleteFileDropStats()) {
-          snapshotReader.dropStats()
-        }
-        if (pushedPartitionFilters.nonEmpty) {
-          
snapshotReader.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
-        }
-        if (pushedDataFilters.nonEmpty) {
-          
snapshotReader.withFilter(PredicateBuilder.and(pushedDataFilters.asJava))
-        }
-        snapshotReader.withDataFileNameFilter(fileName => 
filteredFileNames.contains(fileName))
-        dataSplits =
-          snapshotReader.read().splits().asScala.collect { case s: DataSplit 
=> s }.toArray
-
-      case _ => throw new RuntimeException("Only FileStoreTable support.")
+    val snapshotReader = table.newSnapshotReader()
+    if (table.coreOptions().manifestDeleteFileDropStats()) {
+      snapshotReader.dropStats()
+    }
+    if (pushedPartitionFilters.nonEmpty) {
+      
snapshotReader.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
+    }
+    if (pushedDataFilters.nonEmpty) {
+      snapshotReader.withFilter(PredicateBuilder.and(pushedDataFilters.asJava))
     }
+    snapshotReader.withDataFileNameFilter(fileName => 
filteredFileNames.contains(fileName))
+    dataSplits = snapshotReader.read().splits().asScala.collect { case s: 
DataSplit => s }.toArray
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
new file mode 100644
index 0000000000..e415e5cbf7
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/rowops/PaimonSparkCopyOnWriteOperation.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.rowops
+
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.PaimonBaseScanBuilder
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
+import org.apache.paimon.spark.write.PaimonV2WriteBuilder
+import org.apache.paimon.table.FileStoreTable
+
+import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo, 
RowLevelOperation, RowLevelOperationInfo, WriteBuilder}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PaimonSparkCopyOnWriteOperation(table: FileStoreTable, info: 
RowLevelOperationInfo)
+  extends RowLevelOperation {
+
+  private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
+
+  override def command(): RowLevelOperation.Command = info.command()
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
+    new PaimonBaseScanBuilder {
+      override lazy val table: FileStoreTable =
+        
PaimonSparkCopyOnWriteOperation.this.table.copy(options.asCaseSensitiveMap)
+
+      override def build(): Scan = {
+        val scan =
+          PaimonCopyOnWriteScan(table, requiredSchema, pushedPartitionFilters, 
pushedDataFilters)
+        PaimonSparkCopyOnWriteOperation.this.copyOnWriteScan = Option(scan)
+        scan
+      }
+    }
+  }
+
+  override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+    val options = Options.fromMap(info.options)
+    val builder = new PaimonV2WriteBuilder(table, info.schema(), options)
+    assert(copyOnWriteScan.isDefined)
+    builder.overwriteFiles(copyOnWriteScan.get)
+  }
+
+  override def requiredMetadataAttributes(): Array[NamedReference] = {
+    Array(Expressions.column(FILE_PATH_COLUMN))
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
index dda04245cf..3d37e7efca 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.write
 
-import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
 import org.apache.paimon.table.Table
 
 import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, 
SupportsOverwrite, WriteBuilder}
@@ -30,48 +30,35 @@ abstract class BaseV2WriteBuilder(table: Table)
   with SupportsOverwrite
   with SupportsDynamicOverwrite {
 
-  protected var overwriteDynamic = false
+  protected var overwriteDynamic: Option[Boolean] = None
   protected var overwritePartitions: Option[Map[String, String]] = None
-
-  protected var isOverwriteFiles = false
   protected var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
 
-  def overwriteFiles(scan: Option[PaimonCopyOnWriteScan]): WriteBuilder = {
-    this.isOverwriteFiles = true
-    this.copyOnWriteScan = scan
+  def overwriteFiles(copyOnWriteScan: PaimonCopyOnWriteScan): WriteBuilder = {
+    assert(overwriteDynamic.isEmpty && overwritePartitions.isEmpty)
+    this.copyOnWriteScan = Some(copyOnWriteScan)
     this
   }
 
   override def overwrite(filters: Array[Filter]): WriteBuilder = {
-    if (overwriteDynamic) {
-      throw new IllegalArgumentException("Cannot overwrite dynamically and by 
filter both")
-    }
-
+    assert(overwriteDynamic.isEmpty && copyOnWriteScan.isEmpty)
     failIfCanNotOverwrite(filters)
 
-    val conjunctiveFilters = if (filters.nonEmpty) {
-      Some(filters.reduce((l, r) => And(l, r)))
+    overwriteDynamic = Some(false)
+    val conjunctiveFilters = filters.reduce((l, r) => And(l, r))
+    if (isTruncate(conjunctiveFilters)) {
+      overwritePartitions = Some(Map.empty[String, String])
     } else {
-      None
+      overwritePartitions = Some(
+        convertPartitionFilterToMap(conjunctiveFilters, partitionRowType()))
     }
-
-    if (isTruncate(conjunctiveFilters.get)) {
-      overwritePartitions = Option.apply(Map.empty[String, String])
-    } else {
-      overwritePartitions =
-        Option.apply(convertPartitionFilterToMap(conjunctiveFilters.get, 
partitionRowType()))
-    }
-
     this
   }
 
   override def overwriteDynamicPartitions(): WriteBuilder = {
-    if (overwritePartitions.exists(_.nonEmpty)) {
-      throw new IllegalArgumentException("Cannot overwrite dynamically and by 
filter both")
-    }
-
-    overwriteDynamic = true
-    overwritePartitions = Option.apply(Map.empty[String, String])
+    assert(overwritePartitions.isEmpty && copyOnWriteScan.isEmpty)
+    overwriteDynamic = Some(true)
+    overwritePartitions = Some(Map.empty[String, String])
     this
   }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
new file mode 100644
index 0000000000..589ba17451
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonBatchWrite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
+import org.apache.paimon.spark.catalyst.Compatibility
+import org.apache.paimon.spark.commands.SparkDataFileMeta
+import org.apache.paimon.spark.metric.SparkMetricRegistry
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageImpl}
+
+import org.apache.spark.sql.PaimonSparkSession
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.execution.SQLExecution
+import org.apache.spark.sql.execution.metric.SQLMetrics
+import org.apache.spark.sql.types.StructType
+
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+case class PaimonBatchWrite(
+    table: FileStoreTable,
+    writeSchema: StructType,
+    dataSchema: StructType,
+    overwritePartitions: Option[Map[String, String]],
+    copyOnWriteScan: Option[PaimonCopyOnWriteScan])
+  extends BatchWrite
+  with WriteHelper {
+
+  protected val metricRegistry = SparkMetricRegistry()
+
+  protected val batchWriteBuilder: BatchWriteBuilder = {
+    val builder = table.newBatchWriteBuilder()
+    overwritePartitions.foreach(partitions => 
builder.withOverwrite(partitions.asJava))
+    builder
+  }
+
+  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
+    val fullCompactionDeltaCommits: Option[Int] =
+      Option.apply(coreOptions.fullCompactionDeltaCommits())
+    (_: Int, _: Long) => {
+      PaimonV2DataWriter(batchWriteBuilder, writeSchema, dataSchema, 
fullCompactionDeltaCommits)
+    }
+  }
+
+  override def useCommitCoordinator(): Boolean = false
+
+  override def commit(messages: Array[WriterCommitMessage]): Unit = {
+    logInfo(s"Committing to table ${table.name()}")
+    val batchTableCommit = batchWriteBuilder.newCommit()
+    batchTableCommit.withMetricRegistry(metricRegistry)
+    val addCommitMessage = WriteTaskResult.merge(messages)
+    val deletedCommitMessage = copyOnWriteScan match {
+      case Some(scan) => buildDeletedCommitMessage(scan.scannedFiles)
+      case None => Seq.empty
+    }
+    val commitMessages = addCommitMessage ++ deletedCommitMessage
+    try {
+      val start = System.currentTimeMillis()
+      batchTableCommit.commit(commitMessages.asJava)
+      logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+    } finally {
+      batchTableCommit.close()
+    }
+    postDriverMetrics()
+    postCommit(commitMessages)
+  }
+
+  // Spark support v2 write driver metrics since 4.0, see 
https://github.com/apache/spark/pull/48573
+  // To ensure compatibility with 3.x, manually post driver metrics here 
instead of using Spark's API.
+  protected def postDriverMetrics(): Unit = {
+    val spark = PaimonSparkSession.active
+    // todo: find a more suitable way to get metrics.
+    val commitMetrics = metricRegistry.buildSparkCommitMetrics()
+    val executionId = 
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    val executionMetrics = Compatibility.getExecutionMetrics(spark, 
executionId.toLong).distinct
+    val metricUpdates = executionMetrics.flatMap {
+      m =>
+        commitMetrics.find(x => 
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
+          case Some(customTaskMetric) => Some((m.accumulatorId, 
customTaskMetric.value()))
+          case None => None
+        }
+    }
+    SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext, 
executionId, metricUpdates)
+  }
+
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {
+    // TODO clean uncommitted files
+  }
+
+  private def buildDeletedCommitMessage(
+      deletedFiles: Seq[SparkDataFileMeta]): Seq[CommitMessage] = {
+    deletedFiles
+      .groupBy(f => (f.partition, f.bucket))
+      .map {
+        case ((partition, bucket), files) =>
+          val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
+
+          new CommitMessageImpl(
+            partition,
+            bucket,
+            files.head.totalBuckets,
+            new DataIncrement(
+              Collections.emptyList[DataFileMeta],
+              deletedDataFileMetas,
+              Collections.emptyList[DataFileMeta]),
+            new CompactIncrement(
+              Collections.emptyList[DataFileMeta],
+              Collections.emptyList[DataFileMeta],
+              Collections.emptyList[DataFileMeta])
+          )
+      }
+      .toSeq
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
new file mode 100644
index 0000000000..d5291fe5d8
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2DataWriter.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.write
+
+import org.apache.paimon.spark.{SparkInternalRowWrapper, SparkUtils}
+import org.apache.paimon.spark.metric.SparkMetricRegistry
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
TableWriteImpl}
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.connector.metric.CustomTaskMetric
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.JavaConverters._
+
+case class PaimonV2DataWriter(
+    writeBuilder: BatchWriteBuilder,
+    writeSchema: StructType,
+    dataSchema: StructType,
+    fullCompactionDeltaCommits: Option[Int],
+    batchId: Option[Long] = None)
+  extends abstractInnerTableDataWrite[InternalRow]
+  with InnerTableV2DataWrite {
+
+  private val ioManager = SparkUtils.createIOManager()
+
+  private val metricRegistry = SparkMetricRegistry()
+
+  val write: TableWriteImpl[InternalRow] = {
+    writeBuilder
+      .newWrite()
+      .withIOManager(ioManager)
+      .withMetricRegistry(metricRegistry)
+      .asInstanceOf[TableWriteImpl[InternalRow]]
+  }
+
+  private val rowConverter: InternalRow => SparkInternalRowWrapper = {
+    val numFields = writeSchema.fields.length
+    val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema, 
dataSchema, numFields)
+    record => reusableWrapper.replace(record)
+  }
+
+  override def write(record: InternalRow): Unit = {
+    postWrite(write.writeAndReturn(rowConverter.apply(record)))
+  }
+
+  override def commitImpl(): Seq[CommitMessage] = {
+    write.prepareCommit().asScala.toSeq
+  }
+
+  override def abort(): Unit = close()
+
+  override def close(): Unit = {
+    try {
+      write.close()
+      ioManager.close()
+    } catch {
+      case e: Exception => throw new RuntimeException(e)
+    }
+  }
+
+  override def currentMetricsValues(): Array[CustomTaskMetric] = {
+    metricRegistry.buildSparkWriteMetrics()
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 5a46e208f5..e2ead2f726 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -18,38 +18,23 @@
 
 package org.apache.paimon.spark.write
 
-import org.apache.paimon.CoreOptions
-import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark._
-import org.apache.paimon.spark.catalyst.Compatibility
-import org.apache.paimon.spark.commands.{SchemaHelper, SparkDataFileMeta}
-import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
-import org.apache.paimon.spark.metric.SparkMetricRegistry
-import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.commands.SchemaHelper
+import org.apache.paimon.spark.rowops.PaimonCopyOnWriteScan
 import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage, 
CommitMessageImpl, TableWriteImpl}
-import org.apache.paimon.table.source.DataSplit
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.PaimonSparkSession
-import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.connector.distributions.Distribution
 import org.apache.spark.sql.connector.expressions.SortOrder
-import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.metric.CustomMetric
 import org.apache.spark.sql.connector.write._
-import org.apache.spark.sql.execution.SQLExecution
-import org.apache.spark.sql.execution.metric.SQLMetrics
 import org.apache.spark.sql.types.StructType
 
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
 class PaimonV2Write(
     override val originTable: FileStoreTable,
-    overwriteDynamic: Boolean,
     overwritePartitions: Option[Map[String, String]],
+    copyOnWriteScan: Option[PaimonCopyOnWriteScan],
     dataSchema: StructType,
     options: Options
 ) extends Write
@@ -57,25 +42,7 @@ class PaimonV2Write(
   with SchemaHelper
   with Logging {
 
-  assert(
-    !(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
-    "Cannot overwrite dynamically and by filter both")
-
-  private var isOverwriteFiles = false
-
-  private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
-
   private val writeSchema = mergeSchema(dataSchema, options)
-
-  def overwriteFiles(scan: Option[PaimonCopyOnWriteScan]): PaimonV2Write = {
-    this.isOverwriteFiles = true
-    this.copyOnWriteScan = scan
-    this
-  }
-
-  updateTableWithOptions(
-    Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key -> 
overwriteDynamic.toString))
-
   private val writeRequirement = PaimonWriteRequirement(table)
 
   override def requiredDistribution(): Distribution = {
@@ -91,11 +58,7 @@ class PaimonV2Write(
   }
 
   override def toBatch: BatchWrite = {
-    if (isOverwriteFiles) {
-      CopyOnWriteBatchWrite(table, writeSchema, dataSchema, 
overwritePartitions, copyOnWriteScan)
-    } else {
-      PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions)
-    }
+    PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions, 
copyOnWriteScan)
   }
 
   override def supportedCustomMetrics(): Array[CustomMetric] = {
@@ -113,6 +76,7 @@ class PaimonV2Write(
   }
 
   override def toString: String = {
+    val overwriteDynamic = table.coreOptions().dynamicPartitionOverwrite()
     val overwriteDynamicStr = if (overwriteDynamic) {
       ", overwriteDynamic=true"
     } else {
@@ -120,211 +84,11 @@ class PaimonV2Write(
     }
     val overwritePartitionsStr = overwritePartitions match {
       case Some(partitions) if partitions.nonEmpty => s", 
overwritePartitions=$partitions"
-      case Some(_) => ", overwriteTable=true"
-      case None => ""
+      case Some(_) if !overwriteDynamic => ", overwriteTable=true"
+      case _ => ""
     }
     
s"PaimonWrite(table=${table.fullName()}$overwriteDynamicStr$overwritePartitionsStr)"
   }
 
   override def description(): String = toString
 }
-
-private case class PaimonBatchWrite(
-    table: FileStoreTable,
-    writeSchema: StructType,
-    dataSchema: StructType,
-    overwritePartitions: Option[Map[String, String]])
-  extends PaimonBatchWriteBase(table, writeSchema, dataSchema, 
overwritePartitions) {}
-
-abstract class PaimonBatchWriteBase(
-    table: FileStoreTable,
-    writeSchema: StructType,
-    dataSchema: StructType,
-    overwritePartitions: Option[Map[String, String]])
-  extends BatchWrite
-  with WriteHelper {
-
-  protected val metricRegistry = SparkMetricRegistry()
-
-  protected val batchWriteBuilder = {
-    val builder = table.newBatchWriteBuilder()
-    overwritePartitions.foreach(partitions => 
builder.withOverwrite(partitions.asJava))
-    builder
-  }
-
-  override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory = {
-    val fullCompactionDeltaCommits: Option[Int] =
-      Option.apply(coreOptions.fullCompactionDeltaCommits())
-    WriterFactory(writeSchema, dataSchema, batchWriteBuilder, 
fullCompactionDeltaCommits)
-  }
-
-  override def useCommitCoordinator(): Boolean = false
-
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
-    logInfo(s"Committing to table ${table.name()}")
-    val batchTableCommit = batchWriteBuilder.newCommit()
-    batchTableCommit.withMetricRegistry(metricRegistry)
-    val commitMessages = WriteTaskResult.merge(messages)
-    try {
-      val start = System.currentTimeMillis()
-      batchTableCommit.commit(commitMessages.asJava)
-      logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
-    } finally {
-      batchTableCommit.close()
-    }
-    postDriverMetrics()
-    postCommit(commitMessages)
-  }
-
-  override def abort(messages: Array[WriterCommitMessage]): Unit = {
-    // TODO clean uncommitted files
-  }
-
-  // Spark support v2 write driver metrics since 4.0, see 
https://github.com/apache/spark/pull/48573
-  // To ensure compatibility with 3.x, manually post driver metrics here 
instead of using Spark's API.
-  protected def postDriverMetrics(): Unit = {
-    val spark = PaimonSparkSession.active
-    // todo: find a more suitable way to get metrics.
-    val commitMetrics = metricRegistry.buildSparkCommitMetrics()
-    val executionId = 
spark.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    val executionMetrics = Compatibility.getExecutionMetrics(spark, 
executionId.toLong).distinct
-    val metricUpdates = executionMetrics.flatMap {
-      m =>
-        commitMetrics.find(x => 
m.metricType.toLowerCase.contains(x.name.toLowerCase)) match {
-          case Some(customTaskMetric) => Some((m.accumulatorId, 
customTaskMetric.value()))
-          case None => None
-        }
-    }
-    SQLMetrics.postDriverMetricsUpdatedByValue(spark.sparkContext, 
executionId, metricUpdates)
-  }
-}
-
-private case class CopyOnWriteBatchWrite(
-    table: FileStoreTable,
-    writeSchema: StructType,
-    dataSchema: StructType,
-    overwritePartitions: Option[Map[String, String]],
-    scan: Option[PaimonCopyOnWriteScan])
-  extends PaimonBatchWriteBase(table, writeSchema, dataSchema, 
overwritePartitions) {
-
-  override def commit(messages: Array[WriterCommitMessage]): Unit = {
-    logInfo(s"CopyOnWrite committing to table ${table.name()}")
-
-    val batchTableCommit = batchWriteBuilder.newCommit()
-
-    try {
-      if (scan.isEmpty) {
-        batchTableCommit.truncateTable()
-      } else {
-        val touchedFiles = candidateFiles(scan.get.dataSplits)
-        val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
-        val addCommitMessages = WriteTaskResult.merge(messages)
-        val commitMessages = addCommitMessages ++ deletedCommitMessage
-
-        batchTableCommit.withMetricRegistry(metricRegistry)
-        val start = System.currentTimeMillis()
-        batchTableCommit.commit(commitMessages.asJava)
-        logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
-        postCommit(commitMessages)
-      }
-    } finally {
-      batchTableCommit.close()
-      postDriverMetrics()
-    }
-  }
-
-  private def candidateFiles(candidateDataSplits: Seq[DataSplit]): 
Array[SparkDataFileMeta] = {
-    val totalBuckets = coreOptions.bucket()
-    candidateDataSplits
-      .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, 
totalBuckets))
-      .toArray
-  }
-
-  private def buildDeletedCommitMessage(
-      deletedFiles: Array[SparkDataFileMeta]): Seq[CommitMessage] = {
-    deletedFiles
-      .groupBy(f => (f.partition, f.bucket))
-      .map {
-        case ((partition, bucket), files) =>
-          val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
-
-          new CommitMessageImpl(
-            partition,
-            bucket,
-            files.head.totalBuckets,
-            new DataIncrement(
-              Collections.emptyList[DataFileMeta],
-              deletedDataFileMetas,
-              Collections.emptyList[DataFileMeta]),
-            new CompactIncrement(
-              Collections.emptyList[DataFileMeta],
-              Collections.emptyList[DataFileMeta],
-              Collections.emptyList[DataFileMeta])
-          )
-      }
-      .toSeq
-  }
-}
-
-private case class WriterFactory(
-    writeSchema: StructType,
-    dataSchema: StructType,
-    batchWriteBuilder: BatchWriteBuilder,
-    fullCompactionDeltaCommits: Option[Int])
-  extends DataWriterFactory {
-
-  override def createWriter(partitionId: Int, taskId: Long): 
DataWriter[InternalRow] = {
-    PaimonV2DataWriter(batchWriteBuilder, writeSchema, dataSchema, 
fullCompactionDeltaCommits)
-  }
-}
-
-private case class PaimonV2DataWriter(
-    writeBuilder: BatchWriteBuilder,
-    writeSchema: StructType,
-    dataSchema: StructType,
-    fullCompactionDeltaCommits: Option[Int],
-    batchId: Option[Long] = None)
-  extends abstractInnerTableDataWrite[InternalRow]
-  with InnerTableV2DataWrite {
-
-  private val ioManager = SparkUtils.createIOManager()
-
-  private val metricRegistry = SparkMetricRegistry()
-
-  val write: TableWriteImpl[InternalRow] = {
-    writeBuilder
-      .newWrite()
-      .withIOManager(ioManager)
-      .withMetricRegistry(metricRegistry)
-      .asInstanceOf[TableWriteImpl[InternalRow]]
-  }
-
-  private val rowConverter: InternalRow => SparkInternalRowWrapper = {
-    val numFields = writeSchema.fields.length
-    val reusableWrapper = new SparkInternalRowWrapper(-1, writeSchema, 
dataSchema, numFields)
-    record => reusableWrapper.replace(record)
-  }
-
-  override def write(record: InternalRow): Unit = {
-    postWrite(write.writeAndReturn(rowConverter.apply(record)))
-  }
-
-  override def commitImpl(): Seq[CommitMessage] = {
-    write.prepareCommit().asScala.toSeq
-  }
-
-  override def abort(): Unit = close()
-
-  override def close(): Unit = {
-    try {
-      write.close()
-      ioManager.close()
-    } catch {
-      case e: Exception => throw new RuntimeException(e)
-    }
-  }
-
-  override def currentMetricsValues(): Array[CustomTaskMetric] = {
-    metricRegistry.buildSparkWriteMetrics()
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
index f8036ff40a..91f4f861ce 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
@@ -18,23 +18,25 @@
 
 package org.apache.paimon.spark.write
 
+import org.apache.paimon.CoreOptions
 import org.apache.paimon.options.Options
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.types.StructType
 
+import scala.collection.JavaConverters._
+
 class PaimonV2WriteBuilder(table: FileStoreTable, dataSchema: StructType, 
options: Options)
   extends BaseV2WriteBuilder(table) {
 
-  override def build = {
-    val paimonV2Write =
-      new PaimonV2Write(table, overwriteDynamic, overwritePartitions, 
dataSchema, options)
-    if (isOverwriteFiles) {
-      paimonV2Write.overwriteFiles(copyOnWriteScan)
-    } else {
-      paimonV2Write
+  override def build: PaimonV2Write = {
+    val finalTable = overwriteDynamic match {
+      case Some(o) =>
+        table.copy(Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key -> 
o.toString).asJava)
+      case _ => table
     }
+    new PaimonV2Write(finalTable, overwritePartitions, copyOnWriteScan, 
dataSchema, options)
   }
 
   override def partitionRowType(): RowType = 
table.schema().logicalPartitionType()
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
index cfd4750d0b..0e6209a598 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
@@ -46,5 +46,4 @@ class PaimonWriteBuilder(table: FileStoreTable, options: 
Options)
     this.saveMode = Overwrite(conjunctiveFilters)
     this
   }
-
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 3113b905f8..9daa975cba 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -75,6 +75,30 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
     )
   }
 
+  test(
+    s"Paimon Delete: append-only table, no match and full delete scenarios 
with partitioned table") {
+    spark.sql(s"""
+                 |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED 
BY (dt)
+                 |""".stripMargin)
+
+    spark.sql("""
+                |INSERT INTO T
+                |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'), 
(4, 'd', '2025')
+                |""".stripMargin)
+
+    spark.sql("DELETE FROM T WHERE name = 'e'")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d", 
"2025")).toDF()
+    )
+
+    spark.sql("DELETE FROM T")
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY id"),
+      spark.emptyDataFrame
+    )
+  }
+
   test(s"Paimon Delete: append-only table with partition") {
     spark.sql(s"""
                  |CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED 
BY (dt)

Reply via email to