This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6bb6f49dbe [spark] Support DELETE on Paimon append-only table in spark
V2 write (#6704)
6bb6f49dbe is described below
commit 6bb6f49dbe8ebc3249a1b8ea43da352d3905737b
Author: Kerwin Zhang <[email protected]>
AuthorDate: Wed Dec 17 11:04:14 2025 +0800
[spark] Support DELETE on Paimon append-only table in spark V2 write (#6704)
---
.../scala/org/apache/paimon/spark/SparkTable.scala | 6 --
.../scala/org/apache/paimon/spark/SparkTable.scala | 6 --
.../scala/org/apache/paimon/spark/SparkTable.scala | 6 --
.../paimon/spark/sql/V2DeleteFromTableTest.scala} | 18 ++--
.../paimon/spark/PaimonCopyOnWriteScan.scala | 104 +++++++++++++++++++
.../spark/PaimonSparkCopyOnWriteOperation.scala | 104 +++++++++++++++++++
...a => PaimonSparkRowLevelOperationBuilder.scala} | 13 +--
.../apache/paimon/spark/PaimonSparkTableBase.scala | 2 +-
.../scala/org/apache/paimon/spark/SparkTable.scala | 12 ++-
.../catalyst/analysis/PaimonDeleteTable.scala | 34 ++++++-
.../paimon/spark/write/BaseV2WriteBuilder.scala | 10 ++
.../apache/paimon/spark/write/PaimonV2Write.scala | 111 +++++++++++++++++++--
.../paimon/spark/write/PaimonV2WriteBuilder.scala | 13 ++-
.../paimon/spark/sql/DeleteFromTableTestBase.scala | 23 +++++
14 files changed, 414 insertions(+), 48 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/SparkTable.scala
similarity index 85%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 69f134196f..284426b615 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -22,9 +22,3 @@ import org.apache.paimon.table.Table
/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
case class SparkTable(override val table: Table) extends
PaimonSparkTableBase(table) {}
-
-case class SparkIcebergTable(table: Table) extends BaseTable
-
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/SparkTable.scala
similarity index 85%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to
paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 69f134196f..284426b615 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -22,9 +22,3 @@ import org.apache.paimon.table.Table
/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
case class SparkTable(override val table: Table) extends
PaimonSparkTableBase(table) {}
-
-case class SparkIcebergTable(table: Table) extends BaseTable
-
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/SparkTable.scala
similarity index 85%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to
paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 69f134196f..284426b615 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-3.4/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -22,9 +22,3 @@ import org.apache.paimon.table.Table
/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
case class SparkTable(override val table: Table) extends
PaimonSparkTableBase(table) {}
-
-case class SparkIcebergTable(table: Table) extends BaseTable
-
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
similarity index 66%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to
paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
index 69f134196f..947fe503b1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/V2DeleteFromTableTest.scala
@@ -16,15 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.sql
-import org.apache.paimon.table.Table
+import org.apache.spark.SparkConf
-/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(override val table: Table) extends
PaimonSparkTableBase(table) {}
-
-case class SparkIcebergTable(table: Table) extends BaseTable
-
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
+class V2DeleteFromTableTest extends DeleteFromTableTestBase {
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.paimon.write.use-v2-write", "true")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
new file mode 100644
index 0000000000..7e45f4be1b
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
+import org.apache.paimon.table.{FileStoreTable, InnerTable}
+import org.apache.paimon.table.source.{DataSplit, Split}
+
+import org.apache.spark.sql.PaimonUtils
+import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
+import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
SparkPredicate}
+import org.apache.spark.sql.connector.read.{Batch, SupportsRuntimeV2Filtering}
+import org.apache.spark.sql.sources.{Filter, In}
+import org.apache.spark.sql.types.StructType
+
+import java.nio.file.Paths
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class PaimonCopyOnWriteScan(
+ table: InnerTable,
+ requiredSchema: StructType,
+ pushedPartitionFilters: Seq[PartitionPredicate],
+ pushedDataFilters: Seq[Predicate],
+ bucketedScanDisabled: Boolean = false)
+ extends PaimonScanCommon(table, requiredSchema, bucketedScanDisabled)
+ with SupportsRuntimeV2Filtering {
+
+ var filteredLocations: mutable.Set[String] = mutable.Set[String]()
+
+ var filteredFileNames: mutable.Set[String] = mutable.Set[String]()
+
+ var dataSplits: Array[DataSplit] = Array()
+
+ def disableBucketedScan(): PaimonCopyOnWriteScan = {
+ copy(bucketedScanDisabled = true)
+ }
+
+ override def filterAttributes(): Array[NamedReference] = {
+ Array(Expressions.column(FILE_PATH_COLUMN))
+ }
+
+ override def filter(predicates: Array[SparkPredicate]): Unit = {
+ val runtimefilters: Array[Filter] =
predicates.flatMap(PaimonUtils.filterV2ToV1)
+ for (filter <- runtimefilters) {
+ filter match {
+ case in: In if in.attribute.equalsIgnoreCase(FILE_PATH_COLUMN) =>
+ for (value <- in.values) {
+ val location = value.asInstanceOf[String]
+ filteredLocations.add(location)
+ filteredFileNames.add(Paths.get(location).getFileName.toString)
+ }
+ case _ => logWarning("Unsupported runtime filter")
+ }
+ }
+
+ table match {
+ case fileStoreTable: FileStoreTable =>
+ val snapshotReader = fileStoreTable.newSnapshotReader()
+ if (fileStoreTable.coreOptions().manifestDeleteFileDropStats()) {
+ snapshotReader.dropStats()
+ }
+
+ pushedPartitionFilters.foreach(snapshotReader.withPartitionFilter)
+
+ pushedDataFilters.foreach(snapshotReader.withFilter)
+
+ snapshotReader.withDataFileNameFilter(fileName =>
filteredFileNames.contains(fileName))
+
+ dataSplits =
+ snapshotReader.read().splits().asScala.collect { case s: DataSplit
=> s }.toArray
+
+ case _ => throw new RuntimeException("Only FileStoreTable support.")
+ }
+
+ }
+
+ override def toBatch: Batch = {
+ PaimonBatch(
+ getInputPartitions(dataSplits.asInstanceOf[Array[Split]]),
+ readBuilder,
+ coreOptions.blobAsDescriptor(),
+ metadataColumns)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
new file mode 100644
index 0000000000..96256468e9
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions.BucketFunctionType
+import org.apache.paimon.options.Options
+import org.apache.paimon.spark.catalog.functions.BucketFunction
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
+import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.spark.write.PaimonV2WriteBuilder
+import org.apache.paimon.table.{FileStoreTable, InnerTable, Table}
+import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED,
POSTPONE_MODE}
+
+import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
RowLevelOperation, RowLevelOperationInfo, WriteBuilder}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class PaimonSparkCopyOnWriteOperation(table: Table, info:
RowLevelOperationInfo)
+ extends RowLevelOperation {
+
+ private lazy val coreOptions = new CoreOptions(table.options())
+
+ private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
+
+ private lazy val useV2Write: Boolean = {
+ val v2WriteConfigured = OptionUtils.useV2Write()
+ v2WriteConfigured && supportsV2Write
+ }
+
+ private def supportsV2Write: Boolean = {
+ coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
+ table match {
+ case storeTable: FileStoreTable =>
+ storeTable.bucketMode() match {
+ case HASH_FIXED => BucketFunction.supportsTable(storeTable)
+ case BUCKET_UNAWARE | POSTPONE_MODE => true
+ case _ => false
+ }
+
+ case _ => false
+ }
+ } && coreOptions.clusteringColumns().isEmpty
+ }
+
+ override def command(): RowLevelOperation.Command = info.command()
+
+ override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
+ table match {
+ case t: InnerTable =>
+ new
PaimonScanBuilder(t.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable]) {
+ override def build(): Scan = {
+ val scan =
+ PaimonCopyOnWriteScan(t, requiredSchema, pushedPartitionFilters,
pushedDataFilters)
+ PaimonSparkCopyOnWriteOperation.this.copyOnWriteScan = Option(scan)
+ scan
+ }
+ }
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Scan is only supported for InnerTable. " +
+ s"Actual table type:
${Option(table).map(_.getClass.getSimpleName).getOrElse("null")}"
+ )
+ }
+ }
+
+ override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
+ table match {
+ case fileStoreTable: FileStoreTable if useV2Write =>
+ val options = Options.fromMap(info.options)
+ val builder = new PaimonV2WriteBuilder(fileStoreTable, info.schema(),
options)
+ builder.overwriteFiles(copyOnWriteScan)
+ case _ =>
+ throw new UnsupportedOperationException(
+ s"Write operation is only supported for FileStoreTable with V2 write
enabled. " +
+ s"Actual table type: ${table.getClass.getSimpleName}, useV2Write:
$useV2Write"
+ )
+ }
+ }
+
+ override def requiredMetadataAttributes(): Array[NamedReference] = {
+ val attributes: Seq[NamedReference] = Seq.empty
+ val updatedAttributes = attributes :+ Expressions.column(FILE_PATH_COLUMN)
+ updatedAttributes.toArray
+ }
+
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
similarity index 71%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
index 69f134196f..57f805cb13 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkRowLevelOperationBuilder.scala
@@ -20,11 +20,12 @@ package org.apache.paimon.spark
import org.apache.paimon.table.Table
-/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(override val table: Table) extends
PaimonSparkTableBase(table) {}
+import org.apache.spark.sql.connector.write.{RowLevelOperation,
RowLevelOperationBuilder, RowLevelOperationInfo}
-case class SparkIcebergTable(table: Table) extends BaseTable
+class PaimonSparkRowLevelOperationBuilder(table: Table, info:
RowLevelOperationInfo)
+ extends RowLevelOperationBuilder {
-case class SparkLanceTable(table: Table) extends BaseTable
-
-case class SparkObjectTable(table: Table) extends BaseTable
+ override def build(): RowLevelOperation = {
+ new PaimonSparkCopyOnWriteOperation(table, info)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 859becf546..22758dea0f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -45,7 +45,7 @@ abstract class PaimonSparkTableBase(val table: Table)
lazy val coreOptions = new CoreOptions(table.options())
- private lazy val useV2Write: Boolean = {
+ lazy val useV2Write: Boolean = {
val v2WriteConfigured = OptionUtils.useV2Write()
v2WriteConfigured && supportsV2Write
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index 69f134196f..f0e8f382fe 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -20,8 +20,18 @@ package org.apache.paimon.spark
import org.apache.paimon.table.Table
+import org.apache.spark.sql.connector.catalog.SupportsRowLevelOperations
+import org.apache.spark.sql.connector.write.{RowLevelOperationBuilder,
RowLevelOperationInfo}
+
/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
-case class SparkTable(override val table: Table) extends
PaimonSparkTableBase(table) {}
+case class SparkTable(override val table: Table)
+ extends PaimonSparkTableBase(table)
+ with SupportsRowLevelOperations {
+ override def newRowLevelOperationBuilder(
+ rowLevelOperationInfo: RowLevelOperationInfo): RowLevelOperationBuilder
= {
+ new PaimonSparkRowLevelOperationBuilder(table, rowLevelOperationInfo)
+ }
+}
case class SparkIcebergTable(table: Table) extends BaseTable
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 4a895a517d..15781cff90 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.catalyst.analysis
+import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
import org.apache.paimon.table.FileStoreTable
@@ -28,11 +29,42 @@ import scala.collection.JavaConverters._
object PaimonDeleteTable extends Rule[LogicalPlan] with RowLevelHelper {
+ /**
+ * Determines if DataSourceV2 delete is not supported for the given table.
DataSourceV2 delete is
+ * not supported in the following scenarios:
+ * - Spark version is 3.4 or earlier
+ * - Table does not use V2 write
+ * - Row tracking is enabled
+ * - Deletion vectors are enabled
+ * - Table has primary keys defined
+ * - Table is not a FileStoreTable
+ * - Data evolution is enabled
+ */
+ private def shouldFallbackToV1Delete(table: SparkTable): Boolean = {
+ val baseTable = table.getTable
+
+ val baseConditions = org.apache.spark.SPARK_VERSION <= "3.4" ||
+ !table.useV2Write ||
+ table.coreOptions.rowTrackingEnabled() ||
+ table.coreOptions.deletionVectorsEnabled() ||
+ !baseTable.primaryKeys().isEmpty
+
+ baseConditions || {
+ baseTable match {
+ case paimonTable: FileStoreTable =>
+ paimonTable.coreOptions().dataEvolutionEnabled()
+ case _ =>
+ true
+ }
+ }
+ }
+
override val operation: RowLevelOp = Delete
override def apply(plan: LogicalPlan): LogicalPlan = {
plan.resolveOperators {
- case d @ DeleteFromTable(PaimonRelation(table), condition) if d.resolved
=>
+ case d @ DeleteFromTable(PaimonRelation(table), condition)
+ if d.resolved && shouldFallbackToV1Delete(table) =>
checkPaimonTable(table.getTable)
table.getTable match {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
index 977e4510cb..17c8e9800f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.write
+import org.apache.paimon.spark.PaimonCopyOnWriteScan
import org.apache.paimon.table.Table
import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite,
SupportsOverwrite, WriteBuilder}
@@ -32,6 +33,15 @@ abstract class BaseV2WriteBuilder(table: Table)
protected var overwriteDynamic = false
protected var overwritePartitions: Option[Map[String, String]] = None
+ protected var isOverwriteFiles = false
+ protected var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
+
+ def overwriteFiles(scan: Option[PaimonCopyOnWriteScan]): WriteBuilder = {
+ this.isOverwriteFiles = true
+ this.copyOnWriteScan = scan
+ this
+ }
+
override def overwrite(filters: Array[Filter]): WriteBuilder = {
if (overwriteDynamic) {
throw new IllegalArgumentException("Cannot overwrite dynamically and by
filter both")
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 79c2d5c8fc..ba89e84e2e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -19,13 +19,16 @@
package org.apache.paimon.spark.write
import org.apache.paimon.CoreOptions
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
import org.apache.paimon.options.Options
import org.apache.paimon.spark._
import org.apache.paimon.spark.catalyst.Compatibility
-import org.apache.paimon.spark.commands.SchemaHelper
+import org.apache.paimon.spark.commands.{SchemaHelper, SparkDataFileMeta}
+import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
TableWriteImpl}
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageImpl, TableWriteImpl}
+import org.apache.paimon.table.source.DataSplit
import org.apache.spark.internal.Logging
import org.apache.spark.sql.PaimonSparkSession
@@ -38,6 +41,8 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
+import java.util.Collections
+
import scala.collection.JavaConverters._
class PaimonV2Write(
@@ -55,8 +60,18 @@ class PaimonV2Write(
!(overwriteDynamic && overwritePartitions.exists(_.nonEmpty)),
"Cannot overwrite dynamically and by filter both")
+ private var isOverwriteFiles = false
+
+ private var copyOnWriteScan: Option[PaimonCopyOnWriteScan] = None
+
private val writeSchema = mergeSchema(dataSchema, options)
+ def overwriteFiles(scan: Option[PaimonCopyOnWriteScan]): PaimonV2Write = {
+ this.isOverwriteFiles = true
+ this.copyOnWriteScan = scan
+ this
+ }
+
updateTableWithOptions(
Map(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key ->
overwriteDynamic.toString))
@@ -74,8 +89,13 @@ class PaimonV2Write(
ordering
}
- override def toBatch: BatchWrite =
- PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions)
+ override def toBatch: BatchWrite = {
+ if (isOverwriteFiles) {
+ CopyOnWriteBatchWrite(table, writeSchema, dataSchema,
overwritePartitions, copyOnWriteScan)
+ } else {
+ PaimonBatchWrite(table, writeSchema, dataSchema, overwritePartitions)
+ }
+ }
override def supportedCustomMetrics(): Array[CustomMetric] = {
Array(
@@ -113,12 +133,19 @@ private case class PaimonBatchWrite(
writeSchema: StructType,
dataSchema: StructType,
overwritePartitions: Option[Map[String, String]])
+ extends PaimonBatchWriteBase(table, writeSchema, dataSchema,
overwritePartitions) {}
+
+abstract class PaimonBatchWriteBase(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]])
extends BatchWrite
with WriteHelper {
- private val metricRegistry = SparkMetricRegistry()
+ protected val metricRegistry = SparkMetricRegistry()
- private val batchWriteBuilder = {
+ protected val batchWriteBuilder = {
val builder = table.newBatchWriteBuilder()
overwritePartitions.foreach(partitions =>
builder.withOverwrite(partitions.asJava))
builder
@@ -154,7 +181,7 @@ private case class PaimonBatchWrite(
// Spark support v2 write driver metrics since 4.0, see
https://github.com/apache/spark/pull/48573
// To ensure compatibility with 3.x, manually post driver metrics here
instead of using Spark's API.
- private def postDriverMetrics(): Unit = {
+ protected def postDriverMetrics(): Unit = {
val spark = PaimonSparkSession.active
// todo: find a more suitable way to get metrics.
val commitMetrics = metricRegistry.buildSparkCommitMetrics()
@@ -171,6 +198,76 @@ private case class PaimonBatchWrite(
}
}
+private case class CopyOnWriteBatchWrite(
+ table: FileStoreTable,
+ writeSchema: StructType,
+ dataSchema: StructType,
+ overwritePartitions: Option[Map[String, String]],
+ scan: Option[PaimonCopyOnWriteScan])
+ extends PaimonBatchWriteBase(table, writeSchema, dataSchema,
overwritePartitions) {
+
+ override def commit(messages: Array[WriterCommitMessage]): Unit = {
+ logInfo(s"CopyOnWrite committing to table ${table.name()}")
+
+ val batchTableCommit = batchWriteBuilder.newCommit()
+
+ try {
+ if (scan.isEmpty) {
+ batchTableCommit.truncateTable()
+ } else {
+ val touchedFiles = candidateFiles(scan.get.dataSplits)
+
+ val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+
+ val addCommitMessages = WriteTaskResult.merge(messages)
+
+ val commitMessages = addCommitMessages ++ deletedCommitMessage
+
+ batchTableCommit.withMetricRegistry(metricRegistry)
+ val start = System.currentTimeMillis()
+ batchTableCommit.commit(commitMessages.asJava)
+ logInfo(s"Committed in ${System.currentTimeMillis() - start} ms")
+ postCommit(commitMessages)
+ }
+ } finally {
+ batchTableCommit.close()
+ postDriverMetrics()
+ }
+ }
+
+ private def candidateFiles(candidateDataSplits: Seq[DataSplit]):
Array[SparkDataFileMeta] = {
+ val totalBuckets = coreOptions.bucket()
+ candidateDataSplits
+ .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit,
totalBuckets))
+ .toArray
+ }
+
+ private def buildDeletedCommitMessage(
+ deletedFiles: Array[SparkDataFileMeta]): Seq[CommitMessage] = {
+ deletedFiles
+ .groupBy(f => (f.partition, f.bucket))
+ .map {
+ case ((partition, bucket), files) =>
+ val deletedDataFileMetas = files.map(_.dataFileMeta).toList.asJava
+
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ files.head.totalBuckets,
+ new DataIncrement(
+ Collections.emptyList[DataFileMeta],
+ deletedDataFileMetas,
+ Collections.emptyList[DataFileMeta]),
+ new CompactIncrement(
+ Collections.emptyList[DataFileMeta],
+ Collections.emptyList[DataFileMeta],
+ Collections.emptyList[DataFileMeta])
+ )
+ }
+ .toSeq
+ }
+}
+
private case class WriterFactory(
writeSchema: StructType,
dataSchema: StructType,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
index b11ca13920..f8036ff40a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
@@ -22,15 +22,20 @@ import org.apache.paimon.options.Options
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.types.RowType
-import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite,
SupportsOverwrite, WriteBuilder}
-import org.apache.spark.sql.sources.{And, Filter}
import org.apache.spark.sql.types.StructType
class PaimonV2WriteBuilder(table: FileStoreTable, dataSchema: StructType,
options: Options)
extends BaseV2WriteBuilder(table) {
- override def build =
- new PaimonV2Write(table, overwriteDynamic, overwritePartitions,
dataSchema, options)
+ override def build = {
+ val paimonV2Write =
+ new PaimonV2Write(table, overwriteDynamic, overwritePartitions,
dataSchema, options)
+ if (isOverwriteFiles) {
+ paimonV2Write.overwriteFiles(copyOnWriteScan)
+ } else {
+ paimonV2Write
+ }
+ }
override def partitionRowType(): RowType =
table.schema().logicalPartitionType()
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index ea05d94dd3..3113b905f8 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -52,6 +52,29 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
)
}
+ test(s"Paimon Delete: append-only table, no match and full delete
scenarios") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, name STRING, dt STRING)
+ |""".stripMargin)
+
+ spark.sql("""
+ |INSERT INTO T
+ |VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', '2025'),
(4, 'd', '2025')
+ |""".stripMargin)
+
+ spark.sql("DELETE FROM T WHERE name = 'e'")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ Seq((1, "a", "2024"), (2, "b", "2024"), (3, "c", "2025"), (4, "d",
"2025")).toDF()
+ )
+
+ spark.sql("DELETE FROM T")
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY id"),
+ spark.emptyDataFrame
+ )
+ }
+
test(s"Paimon Delete: append-only table with partition") {
spark.sql(s"""
|CREATE TABLE T (id INT, name STRING, dt STRING) PARTITIONED
BY (dt)