This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d8a728ae4e [spark] delete with metadata columns generates duplicate
relation fields (#5802)
d8a728ae4e is described below
commit d8a728ae4e522caf5f549194c5b6b14518235a18
Author: askwang <[email protected]>
AuthorDate: Fri Jul 11 11:55:52 2025 +0800
[spark] delete with metadata columns generates duplicate relation fields
(#5802)
---
.../paimon/spark/commands/PaimonCommand.scala | 37 +++++++++++++++++-----
.../paimon/spark/sql/DeleteFromTableTestBase.scala | 19 +++++++++++
2 files changed, 48 insertions(+), 8 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 907d8be85c..43d428c533 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -36,7 +36,7 @@ import org.apache.paimon.utils.SerializationUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter =>
FilterLogicalNode, LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -164,13 +164,34 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
assert(sparkTable.table.isInstanceOf[FileStoreTable])
val knownSplitsTable =
KnownSplitsTable.create(sparkTable.table.asInstanceOf[FileStoreTable],
splits.toArray)
- // We re-plan the relation to skip analyze phase, so we should append all
- // metadata columns manually and let Spark do column pruning during
optimization.
- relation.copy(
- table = relation.table.asInstanceOf[SparkTable].copy(table =
knownSplitsTable),
- output = relation.output ++ sparkTable.metadataColumns.map(
- _.asInstanceOf[PaimonMetadataColumn].toAttribute)
- )
+ val metadataColumns =
+
sparkTable.metadataColumns.map(_.asInstanceOf[PaimonMetadataColumn].toAttribute)
+
+ if (needAddMetadataColumns(metadataColumns, relation)) {
+ // We re-plan the relation to skip analyze phase, so we should append all
+ // metadata columns manually and let Spark do column pruning during
optimization.
+ relation.copy(
+ table = relation.table.asInstanceOf[SparkTable].copy(table =
knownSplitsTable),
+ output = relation.output ++ metadataColumns
+ )
+ } else {
+ // Only re-plan the relation with new table info.
+ relation.copy(
+ table = relation.table.asInstanceOf[SparkTable].copy(table =
knownSplitsTable)
+ )
+ }
+ }
+
+ /** Check whether relation output already contains paimon metadata columns.
*/
+ private def needAddMetadataColumns(
+ metadataColumns: Array[AttributeReference],
+ relation: DataSourceV2Relation): Boolean = {
+ val resolve = conf.resolver
+ val outputNames = relation.outputSet.map(_.name)
+
+ def isOutputColumn(colName: String): Boolean =
outputNames.exists(resolve(colName, _))
+
+ !metadataColumns.exists(col => isOutputColumn(col.name))
}
/** Notice that, the key is a relative path, not just the file name. */
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 849e4cc982..43fe3d71b2 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -384,4 +384,23 @@ abstract class DeleteFromTableTestBase extends
PaimonSparkTestBase {
Seq(Row(2, "2024-12-16"), Row(4, "2024-12-18"))
)
}
+
+ test("Paimon delete: delete with metadata column") {
+ spark.sql(s"""
+ |CREATE TABLE T (
+ | id BIGINT,
+ | c1 STRING)
+ |TBLPROPERTIES ('bucket' = '1', 'bucket-key' = 'id',
'file.format'='avro')
+ |""".stripMargin)
+ spark.sql("insert into table T values(1, 'a')")
+
+ val paths = spark.sql("SELECT __paimon_file_path FROM T").collect()
+ assert(paths.length == 1)
+
+ val path = paths(0).getString(0)
+ spark.sql(s"delete from T where __paimon_file_path = '$path'")
+
+ val paths2 = spark.sql("SELECT __paimon_file_path FROM T").collect()
+ assert(paths2.length == 0)
+ }
}