This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3acf89b2d9 [spark] Refactor metadata only delete (#6852)
3acf89b2d9 is described below
commit 3acf89b2d94f2719c7f7892dd8747d038c5c660e
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Dec 22 10:43:43 2025 +0800
[spark] Refactor metadata only delete (#6852)
---
.../paimon/spark/PaimonPartitionManagement.scala | 15 ++++
.../apache/paimon/spark/PaimonSparkTableBase.scala | 9 ++-
.../spark/catalyst/analysis/PaimonAnalysis.scala | 5 +-
.../catalyst/analysis/PaimonDeleteTable.scala | 7 +-
.../analysis/PaimonIncompatiblePHRRules.scala | 54 --------------
.../analysis/expressions/ExpressionHelper.scala | 11 ---
...ptimizeMetadataOnlyDeleteFromPaimonTable.scala} | 84 +++++++++++++++++-----
.../plans/logical/PaimonTableValuedFunctions.scala | 2 +-
.../logical/TruncatePaimonTableWithFilter.scala | 39 ++++++++++
.../commands/DeleteFromPaimonTableCommand.scala | 63 ++--------------
.../commands/PaimonTruncateTableCommand.scala | 52 --------------
.../paimon/spark/execution/PaimonStrategy.scala | 12 +++-
.../TruncatePaimonTableWithFilterExec.scala | 75 +++++++++++++++++++
.../extensions/PaimonSparkSessionExtensions.scala | 7 +-
.../spark/sql/PaimonOptimizationTestBase.scala | 45 +++++++++---
15 files changed, 261 insertions(+), 219 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 36ab850d29..27fa93458b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -89,6 +89,21 @@ trait PaimonPartitionManagement extends
SupportsAtomicPartitionManagement {
}
}
+ override def truncatePartitions(idents: Array[InternalRow]): Boolean = {
+ val partitions = toPaimonPartitions(idents).toSeq.asJava
+ val commit = table.newBatchWriteBuilder().newCommit()
+ try {
+ commit.truncatePartitions(partitions)
+ } finally {
+ commit.close()
+ }
+ true
+ }
+
+ override def truncatePartition(ident: InternalRow): Boolean = {
+ truncatePartitions(Array(ident))
+ }
+
override def replacePartitionMetadata(
ident: InternalRow,
properties: JMap[String, String]): Unit = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 867de1109c..c0d161b224 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -31,7 +31,7 @@ import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE,
HASH_FIXED, POSTPONE_
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.read.ScanBuilder
-import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
+import org.apache.spark.sql.connector.write.{LogicalWriteInfo,
SupportsTruncate, WriteBuilder}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map
=> JMap, Set => JSet}
@@ -42,6 +42,7 @@ abstract class PaimonSparkTableBase(val table: Table)
extends BaseTable
with SupportsRead
with SupportsWrite
+ with TruncatableTable
with SupportsMetadataColumns {
lazy val coreOptions = new CoreOptions(table.options())
@@ -152,4 +153,10 @@ abstract class PaimonSparkTableBase(val table: Table)
throw new RuntimeException("Only FileStoreTable can be written.")
}
}
+
+ def truncateTable: Boolean = {
+ val commit = table.newBatchWriteBuilder().newCommit()
+ commit.truncateTable()
+ true
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
index c6e10fabf1..1c248302c3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.spark.{SparkConnectorOptions,
SparkTable}
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
-import org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand,
PaimonDynamicPartitionOverwriteCommand, PaimonShowColumnsCommand,
PaimonTruncateTableCommand}
+import org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand,
PaimonDynamicPartitionOverwriteCommand, PaimonShowColumnsCommand}
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.table.FileStoreTable
@@ -330,9 +330,6 @@ case class PaimonPostHocResolutionRules(session:
SparkSession) extends Rule[Logi
override def apply(plan: LogicalPlan): LogicalPlan = {
plan match {
- case t @ TruncateTable(PaimonRelation(table)) if t.resolved =>
- PaimonTruncateTableCommand(table, Map.empty)
-
case a @ AnalyzeTable(
ResolvedTable(catalog, identifier, table: SparkTable, _),
partitionSpec,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
index 46b4cc05b4..6808e64c45 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonDeleteTable.scala
@@ -19,11 +19,11 @@
package org.apache.paimon.spark.catalyst.analysis
import org.apache.paimon.spark.SparkTable
+import
org.apache.paimon.spark.catalyst.optimizer.OptimizeMetadataOnlyDeleteFromPaimonTable
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{DeleteFromTable,
LogicalPlan}
import org.apache.spark.sql.catalyst.rules.Rule
@@ -39,8 +39,9 @@ object PaimonDeleteTable extends Rule[LogicalPlan] with
RowLevelHelper {
table.coreOptions.deletionVectorsEnabled() ||
table.coreOptions.rowTrackingEnabled() ||
table.coreOptions.dataEvolutionEnabled() ||
- // todo: Optimize v2 delete when conditions are all partition filters
- condition == null || condition == TrueLiteral
+ OptimizeMetadataOnlyDeleteFromPaimonTable.isMetadataOnlyDelete(
+ baseTable.asInstanceOf[FileStoreTable],
+ condition)
}
override val operation: RowLevelOp = Delete
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
deleted file mode 100644
index bf6eb35757..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonIncompatiblePHRRules.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.catalyst.analysis
-
-import org.apache.paimon.spark.commands.PaimonTruncateTableCommand
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
TruncatePartition}
-import org.apache.spark.sql.catalyst.rules.Rule
-
-/** These post-hoc resolution rules are incompatible between different
versions of spark. */
-case class PaimonIncompatiblePHRRules(session: SparkSession) extends
Rule[LogicalPlan] {
-
- override def apply(plan: LogicalPlan): LogicalPlan = {
- plan match {
- case t @ TruncatePartition(PaimonRelation(table),
ResolvedPartitionSpec(names, ident, _))
- if t.resolved =>
- assert(names.length == ident.numFields, "Names and values of partition
don't match")
- val resolver = session.sessionState.conf.resolver
- val schema = table.schema
- val partitionSpec = names.zipWithIndex.map {
- case (name, index) =>
- val field = schema.find(f => resolver(f.name, name)).getOrElse {
- throw new RuntimeException(s"$name is not a valid partition
column in $schema.")
- }
-
- val partVal: String =
- if (ident.isNullAt(index)) null else ident.get(index,
field.dataType).toString
- (name -> partVal)
- }.toMap
- PaimonTruncateTableCommand(table, partitionSpec)
-
- case _ => plan
- }
- }
-
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 2bed4af873..82dcb594a2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -209,17 +209,6 @@ trait ExpressionHelperBase extends PredicateHelper {
}
}
- def splitPruePartitionAndOtherPredicates(
- condition: Expression,
- partitionColumns: Seq[String],
- resolver: Resolver): (Seq[Expression], Seq[Expression]) = {
- splitConjunctivePredicates(condition)
- .partition {
- isPredicatePartitionColumnsOnly(_, partitionColumns, resolver) &&
!SubqueryExpression
- .hasSubquery(condition)
- }
- }
-
def isPredicatePartitionColumnsOnly(
condition: Expression,
partitionColumns: Seq[String],
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/OptimizeMetadataOnlyDeleteFromPaimonTable.scala
similarity index 58%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/OptimizeMetadataOnlyDeleteFromPaimonTable.scala
index 66f8a10f37..7f28182599 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/EvalSubqueriesForDeleteTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/optimizer/OptimizeMetadataOnlyDeleteFromPaimonTable.scala
@@ -18,52 +18,98 @@
package org.apache.paimon.spark.catalyst.optimizer
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
+import
org.apache.paimon.spark.catalyst.plans.logical.TruncatePaimonTableWithFilter
import org.apache.paimon.spark.commands.DeleteFromPaimonTableCommand
+import org.apache.paimon.table.FileStoreTable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{execution, PaimonSparkSession, SparkSession}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.{Expression, In, InSubquery,
Literal, ScalarSubquery, SubqueryExpression}
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution}
+import org.apache.spark.sql.execution.ExecSubqueryExpression
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.paimon.shims.SparkShimLoader
import org.apache.spark.sql.types.BooleanType
import scala.collection.JavaConverters._
/**
- * For those delete conditions with subqueries that only contain partition
columns, we can eval them
- * in advance. So that when running [[DeleteFromPaimonTableCommand]], we can
directly call
- * dropPartitions to achieve fast deletion.
+ * Similar to spark `OptimizeMetadataOnlyDeleteFromTable`. The reasons why
Paimon Table does not
+ * inherit `SupportsDeleteV2` are as follows:
*
- * Note: this rule must be placed before [[MergePaimonScalarSubqueries]],
because
+ * <p>1. It needs to support both V1 delete and V2 delete simultaneously.
+ *
+ * <p>2. This rule can optimize partition filters that contain subqueries.
+ *
+ * <p>Note: this rule must be placed before [[MergePaimonScalarSubqueries]],
because
* [[MergePaimonScalarSubqueries]] will merge subqueries.
*/
-object EvalSubqueriesForDeleteTable extends Rule[LogicalPlan] with
ExpressionHelper with Logging {
+object OptimizeMetadataOnlyDeleteFromPaimonTable
+ extends Rule[LogicalPlan]
+ with ExpressionHelper
+ with Logging {
lazy val spark: SparkSession = PaimonSparkSession.active
lazy val resolver: Resolver = spark.sessionState.conf.resolver
override def apply(plan: LogicalPlan): LogicalPlan = {
- plan.transformDown {
- case d @ DeleteFromPaimonTableCommand(_, table, condition)
- if SubqueryExpression.hasSubquery(condition) &&
- isPredicatePartitionColumnsOnly(
- condition,
- table.partitionKeys().asScala.toSeq,
- resolver) =>
- try {
- d.copy(condition = evalSubquery(condition))
- } catch {
- case e: Throwable =>
- logInfo(s"Applying EvalSubqueriesForDeleteTable rule failed for:
${e.getMessage}")
- d
+ plan.transform {
+ case d @ DeleteFromPaimonTableCommand(r: DataSourceV2Relation, table,
condition) =>
+ if (isTruncateTable(condition)) {
+ TruncatePaimonTableWithFilter(table, None)
+ } else if (isTruncatePartition(table, condition)) {
+ tryConvertToPartitionPredicate(r, table, condition) match {
+ case Some(p) => TruncatePaimonTableWithFilter(table, Some(p))
+ case _ => d
+ }
+ } else {
+ d
}
}
}
+ def isMetadataOnlyDelete(table: FileStoreTable, condition: Expression):
Boolean = {
+ isTruncateTable(condition) || isTruncatePartition(table, condition)
+ }
+
+ private def isTruncateTable(condition: Expression): Boolean = {
+ condition == null || condition == TrueLiteral
+ }
+
+ private def isTruncatePartition(table: FileStoreTable, condition:
Expression): Boolean = {
+ val partitionKeys = table.partitionKeys().asScala.toSeq
+
+ partitionKeys.nonEmpty &&
+ !table.coreOptions().deleteForceProduceChangelog() &&
+ isPredicatePartitionColumnsOnly(condition, partitionKeys, resolver)
+ }
+
+ private def tryConvertToPartitionPredicate(
+ relation: DataSourceV2Relation,
+ table: FileStoreTable,
+ condition: Expression): Option[PartitionPredicate] = {
+ try {
+ val partitionRowType = table.schema().logicalPartitionType()
+ // For those delete conditions with subqueries that only contain
partition columns, we can eval them in advance.
+ val finalCondiction = if (SubqueryExpression.hasSubquery(condition)) {
+ evalSubquery(condition)
+ } else {
+ condition
+ }
+ convertConditionToPaimonPredicate(finalCondiction, relation.output,
partitionRowType) match {
+ case Some(p) =>
Some(PartitionPredicate.fromPredicate(partitionRowType, p))
+ case None => None
+ }
+ } catch {
+ case _: Throwable => None
+ }
+ }
+
private def evalSubquery(condition: Expression): Expression = {
condition.transformDown {
case InSubquery(values, listQuery) =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
index 7e72abc4c9..e4f5e7856c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PaimonTableValuedFunctions.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalyst.plans.logical
import org.apache.paimon.CoreOptions
import org.apache.paimon.spark.SparkTable
import
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions._
-import org.apache.paimon.table.{DataTable, FileStoreTable}
+import org.apache.paimon.table.DataTable
import
org.apache.paimon.table.source.snapshot.TimeTravelUtil.InconsistentTagBucketException
import org.apache.spark.sql.PaimonUtils.createDataset
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/TruncatePaimonTableWithFilter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/TruncatePaimonTableWithFilter.scala
new file mode 100644
index 0000000000..323802750e
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/TruncatePaimonTableWithFilter.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.catalyst.plans.logical
+
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.leafnode.PaimonLeafCommand
+import org.apache.paimon.table.Table
+
+/**
+ * Truncate paimon table with partition predicate.
+ *
+ * @param partitionPredicate
+ * when it is none means truncate table
+ */
+case class TruncatePaimonTableWithFilter(
+ table: Table,
+ partitionPredicate: Option[PartitionPredicate])
+ extends PaimonLeafCommand {
+
+ override def simpleString(maxFields: Int): String = {
+ s"Truncate table $table with filter"
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 461aca90ef..ed40b5ff21 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -24,18 +24,14 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.PrimaryKeyTableUtils.validatePKUpsertDeletable
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.types.RowKind
-import org.apache.paimon.utils.InternalRowPartitionComputer
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
-import org.apache.spark.sql.catalyst.expressions.{And, Expression, Not}
-import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.expressions.{Expression, Not}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, SupportsSubquery}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.lit
-import scala.collection.JavaConverters._
-
case class DeleteFromPaimonTableCommand(
relation: DataSourceV2Relation,
override val table: FileStoreTable,
@@ -45,61 +41,12 @@ case class DeleteFromPaimonTableCommand(
with SupportsSubquery {
override def run(sparkSession: SparkSession): Seq[Row] = {
-
- val commit = table.newBatchWriteBuilder().newCommit()
- if (condition == null || condition == TrueLiteral) {
- commit.truncateTable()
+ val commitMessages = if (usePKUpsertDelete()) {
+ performPrimaryKeyDelete(sparkSession)
} else {
- val (partitionCondition, otherCondition) =
splitPruePartitionAndOtherPredicates(
- condition,
- table.partitionKeys().asScala.toSeq,
- sparkSession.sessionState.conf.resolver)
-
- val partitionPredicate = if (partitionCondition.isEmpty) {
- None
- } else {
- try {
- convertConditionToPaimonPredicate(
- partitionCondition.reduce(And),
- relation.output,
- table.schema.logicalPartitionType())
- } catch {
- case _: Throwable =>
- None
- }
- }
-
- if (
- otherCondition.isEmpty && partitionPredicate.nonEmpty && !table
- .coreOptions()
- .deleteForceProduceChangelog()
- ) {
- val matchedPartitions =
-
table.newSnapshotReader().withPartitionFilter(partitionPredicate.get).partitions().asScala
- val rowDataPartitionComputer = new InternalRowPartitionComputer(
- table.coreOptions().partitionDefaultName(),
- table.schema().logicalPartitionType(),
- table.partitionKeys.asScala.toArray,
- table.coreOptions().legacyPartitionName()
- )
- val dropPartitions = matchedPartitions.map {
- partition =>
rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
- }
- if (dropPartitions.nonEmpty) {
- commit.truncatePartitions(dropPartitions.asJava)
- } else {
- writer.commit(Seq.empty)
- }
- } else {
- val commitMessages = if (usePKUpsertDelete()) {
- performPrimaryKeyDelete(sparkSession)
- } else {
- performNonPrimaryKeyDelete(sparkSession)
- }
- writer.commit(commitMessages)
- }
+ performNonPrimaryKeyDelete(sparkSession)
}
-
+ writer.commit(commitMessages)
Seq.empty[Row]
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
deleted file mode 100644
index f55c5011e2..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.commands
-
-import org.apache.paimon.spark.SparkTable
-import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
-import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.sink.BatchWriteBuilder
-
-import org.apache.spark.sql.{Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-
-import java.util.{Collections, UUID}
-
-import scala.collection.JavaConverters._
-
-case class PaimonTruncateTableCommand(v2Table: SparkTable, partitionSpec:
TablePartitionSpec)
- extends PaimonLeafRunnableCommand
- with WithFileStoreTable {
-
- override def table: FileStoreTable =
v2Table.getTable.asInstanceOf[FileStoreTable]
-
- override def run(sparkSession: SparkSession): Seq[Row] = {
- val commit = table.newBatchWriteBuilder().newCommit()
-
- if (partitionSpec.isEmpty) {
- commit.truncateTable()
- } else {
- commit.truncatePartitions(
- Collections.singletonList(partitionSpec.asJava)
- )
- }
-
- Seq.empty[Row]
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
index bc0627d89f..3be8b5a74e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala
@@ -18,11 +18,12 @@
package org.apache.paimon.spark.execution
+import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable,
SparkUtils}
import org.apache.paimon.spark.catalog.{SparkBaseCatalog, SupportView}
import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
-import
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand,
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand,
RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}
-import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
+import
org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand,
CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand,
PaimonDropPartitions, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews,
ShowTagsCommand, TruncatePaimonTableWithFilter}
+import org.apache.paimon.table.Table
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
@@ -125,7 +126,7 @@ case class PaimonStrategy(spark: SparkSession)
case _ => Nil
}
- case d @ PaimonDropPartitions(
+ case PaimonDropPartitions(
r @ ResolvedTable(_, _, table: SparkTable, _),
parts,
ifExists,
@@ -137,6 +138,11 @@ case class PaimonStrategy(spark: SparkSession)
purge,
recacheTable(r)) :: Nil
+ case TruncatePaimonTableWithFilter(
+ table: Table,
+ partitionPredicate: Option[PartitionPredicate]) =>
+ TruncatePaimonTableWithFilterExec(table, partitionPredicate) :: Nil
+
case _ => Nil
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/TruncatePaimonTableWithFilterExec.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/TruncatePaimonTableWithFilterExec.scala
new file mode 100644
index 0000000000..41c2187ce3
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/TruncatePaimonTableWithFilterExec.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.execution
+
+import org.apache.paimon.partition.PartitionPredicate
+import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
+import org.apache.paimon.table.{FileStoreTable, Table}
+import org.apache.paimon.utils.InternalRowPartitionComputer
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+
+import java.util.{Collections => JCollections}
+
+import scala.collection.JavaConverters._
+
+case class TruncatePaimonTableWithFilterExec(
+ table: Table,
+ partitionPredicate: Option[PartitionPredicate])
+ extends PaimonLeafV2CommandExec {
+
+ override def run(): Seq[InternalRow] = {
+ val commit = table.newBatchWriteBuilder().newCommit()
+
+ partitionPredicate match {
+ case Some(p) =>
+ table match {
+ case fileStoreTable: FileStoreTable =>
+ val matchedPartitions =
+
fileStoreTable.newSnapshotReader().withPartitionFilter(p).partitions().asScala
+ if (matchedPartitions.nonEmpty) {
+ val partitionComputer = new InternalRowPartitionComputer(
+ fileStoreTable.coreOptions().partitionDefaultName(),
+ fileStoreTable.schema().logicalPartitionType(),
+ fileStoreTable.partitionKeys.asScala.toArray,
+ fileStoreTable.coreOptions().legacyPartitionName()
+ )
+ val dropPartitions =
+
matchedPartitions.map(partitionComputer.generatePartValues(_).asScala.asJava)
+ commit.truncatePartitions(dropPartitions.asJava)
+ } else {
+ commit.commit(JCollections.emptyList())
+ }
+ case _ =>
+ throw new UnsupportedOperationException("Unsupported truncate
table")
+ }
+ case _ =>
+ commit.truncateTable()
+ }
+ Nil
+ }
+
+ override def output: Seq[Attribute] = Nil
+
+ override def simpleString(maxFields: Int): String = {
+ s"TruncatePaimonTableWithFilterExec: ${table.fullName()}" +
+ partitionPredicate.map(p => s", PartitionPredicate: [$p]").getOrElse("")
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
index b6e29b8a77..950b5797c7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/extensions/PaimonSparkSessionExtensions.scala
@@ -18,8 +18,8 @@
package org.apache.paimon.spark.extensions
-import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatiblePHRRules,
PaimonIncompatibleResolutionRules, PaimonMergeInto,
PaimonPostHocResolutionRules, PaimonProcedureResolver, PaimonUpdateTable,
PaimonViewResolver, ReplacePaimonFunctions, RewriteUpsertTable}
-import
org.apache.paimon.spark.catalyst.optimizer.{EvalSubqueriesForDeleteTable,
MergePaimonScalarSubqueries}
+import org.apache.paimon.spark.catalyst.analysis.{PaimonAnalysis,
PaimonDeleteTable, PaimonFunctionResolver, PaimonIncompatibleResolutionRules,
PaimonMergeInto, PaimonPostHocResolutionRules, PaimonProcedureResolver,
PaimonUpdateTable, PaimonViewResolver, ReplacePaimonFunctions,
RewriteUpsertTable}
+import
org.apache.paimon.spark.catalyst.optimizer.{MergePaimonScalarSubqueries,
OptimizeMetadataOnlyDeleteFromPaimonTable}
import
org.apache.paimon.spark.catalyst.plans.logical.PaimonTableValuedFunctions
import org.apache.paimon.spark.commands.BucketExpression
import org.apache.paimon.spark.execution.{OldCompatibleStrategy,
PaimonStrategy}
@@ -46,7 +46,6 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
extensions.injectPostHocResolutionRule(spark =>
ReplacePaimonFunctions(spark))
extensions.injectPostHocResolutionRule(spark =>
PaimonPostHocResolutionRules(spark))
- extensions.injectPostHocResolutionRule(spark =>
PaimonIncompatiblePHRRules(spark))
extensions.injectPostHocResolutionRule(_ => PaimonUpdateTable)
extensions.injectPostHocResolutionRule(_ => PaimonDeleteTable)
@@ -65,7 +64,7 @@ class PaimonSparkSessionExtensions extends
(SparkSessionExtensions => Unit) {
}
// optimization rules
- extensions.injectOptimizerRule(_ => EvalSubqueriesForDeleteTable)
+ extensions.injectOptimizerRule(_ =>
OptimizeMetadataOnlyDeleteFromPaimonTable)
extensions.injectOptimizerRule(_ => MergePaimonScalarSubqueries)
// planner extensions
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index d417b5f405..3eafcc1700 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -22,11 +22,13 @@ import org.apache.paimon.Snapshot.CommitKind
import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueries
+import org.apache.paimon.spark.execution.TruncatePaimonTableWithFilterExec
-import org.apache.spark.sql.{PaimonUtils, Row}
+import org.apache.spark.sql.{DataFrame, PaimonUtils, Row}
import org.apache.spark.sql.catalyst.expressions.{Attribute,
CreateNamedStruct, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef,
LogicalPlan, OneRowRelation, WithCTE}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
+import org.apache.spark.sql.execution.CommandResultExec
import org.apache.spark.sql.functions._
import org.junit.jupiter.api.Assertions
@@ -112,6 +114,23 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase with Expre
}
}
+ test(s"Paimon Optimization: optimize metadata only delete") {
+ for (useV2Write <- Seq("true", "false")) {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE t (id INT, name STRING, pt INT)
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+ sql("INSERT INTO t VALUES (1, 'a', 1), (2, 'b', 2)")
+ val df = sql("DELETE FROM t WHERE pt = 1")
+ checkTruncatePaimonTable(df)
+ checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(2, "b", 2)))
+ }
+ }
+ }
+ }
+
test(s"Paimon Optimization: eval subqueries for delete table with
ScalarSubquery") {
withPk.foreach(
hasPk => {
@@ -132,14 +151,16 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase with Expre
spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")
- spark.sql(s"""DELETE FROM t1 WHERE
- |pt >= (SELECT min(id) FROM t2 WHERE n BETWEEN 2 AND 3)
- |AND
- |pt <= (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND
3)""".stripMargin)
+ val df =
+ spark.sql(s"""DELETE FROM t1 WHERE
+ |pt >= (SELECT min(id) FROM t2 WHERE n BETWEEN 2 AND
3)
+ |AND
+ |pt <= (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND
3)""".stripMargin)
// For partition-only predicates, drop partition is called
internally.
Assertions.assertEquals(
CommitKind.OVERWRITE,
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())
+ checkTruncatePaimonTable(df)
checkAnswer(
spark.sql("SELECT * FROM t1 ORDER BY id"),
@@ -176,14 +197,16 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase with Expre
spark.sql(s"CREATE TABLE t2 (id INT, n INT)")
spark.sql("INSERT INTO t2 VALUES (1, 1), (2, 2), (3, 3), (4, 4)")
- spark.sql(s"""DELETE FROM t1 WHERE
- |pt in (SELECT id FROM t2 WHERE n BETWEEN 2 AND 3)
- |OR
- |pt in (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND
3)""".stripMargin)
+ val df =
+ spark.sql(s"""DELETE FROM t1 WHERE
+ |pt in (SELECT id FROM t2 WHERE n BETWEEN 2 AND 3)
+ |OR
+ |pt in (SELECT max(id) FROM t2 WHERE n BETWEEN 2 AND
3)""".stripMargin)
// For partition-only predicates, drop partition is called
internally.
Assertions.assertEquals(
CommitKind.OVERWRITE,
loadTable("t1").store().snapshotManager().latestSnapshot().commitKind())
+ checkTruncatePaimonTable(df)
checkAnswer(
spark.sql("SELECT * FROM t1 ORDER BY id"),
@@ -206,4 +229,8 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase with Expre
def extractorExpression(cteIndex: Int, output: Seq[Attribute], fieldIndex:
Int): NamedExpression
+ def checkTruncatePaimonTable(df: DataFrame): Unit = {
+ val plan =
df.queryExecution.executedPlan.asInstanceOf[CommandResultExec].commandPhysicalPlan
+ assert(plan.isInstanceOf[TruncatePaimonTableWithFilterExec])
+ }
}