This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d8a728ae4e [spark] delete with metadata columns generates duplicate 
relation fields (#5802)
d8a728ae4e is described below

commit d8a728ae4e522caf5f549194c5b6b14518235a18
Author: askwang <[email protected]>
AuthorDate: Fri Jul 11 11:55:52 2025 +0800

    [spark] delete with metadata columns generates duplicate relation fields 
(#5802)
---
 .../paimon/spark/commands/PaimonCommand.scala      | 37 +++++++++++++++++-----
 .../paimon/spark/sql/DeleteFromTableTestBase.scala | 19 +++++++++++
 2 files changed, 48 insertions(+), 8 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 907d8be85c..43d428c533 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -36,7 +36,7 @@ import org.apache.paimon.utils.SerializationUtils
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{Filter => 
FilterLogicalNode, LogicalPlan, Project}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -164,13 +164,34 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
     assert(sparkTable.table.isInstanceOf[FileStoreTable])
     val knownSplitsTable =
       KnownSplitsTable.create(sparkTable.table.asInstanceOf[FileStoreTable], 
splits.toArray)
-    // We re-plan the relation to skip analyze phase, so we should append all
-    // metadata columns manually and let Spark do column pruning during 
optimization.
-    relation.copy(
-      table = relation.table.asInstanceOf[SparkTable].copy(table = 
knownSplitsTable),
-      output = relation.output ++ sparkTable.metadataColumns.map(
-        _.asInstanceOf[PaimonMetadataColumn].toAttribute)
-    )
+    val metadataColumns =
+      
sparkTable.metadataColumns.map(_.asInstanceOf[PaimonMetadataColumn].toAttribute)
+
+    if (needAddMetadataColumns(metadataColumns, relation)) {
+      // We re-plan the relation to skip analyze phase, so we should append all
+      // metadata columns manually and let Spark do column pruning during 
optimization.
+      relation.copy(
+        table = relation.table.asInstanceOf[SparkTable].copy(table = 
knownSplitsTable),
+        output = relation.output ++ metadataColumns
+      )
+    } else {
+      // Only re-plan the relation with new table info.
+      relation.copy(
+        table = relation.table.asInstanceOf[SparkTable].copy(table = 
knownSplitsTable)
+      )
+    }
+  }
+
+  /** Check whether relation output already contains paimon metadata columns. 
*/
+  private def needAddMetadataColumns(
+      metadataColumns: Array[AttributeReference],
+      relation: DataSourceV2Relation): Boolean = {
+    val resolve = conf.resolver
+    val outputNames = relation.outputSet.map(_.name)
+
+    def isOutputColumn(colName: String): Boolean = 
outputNames.exists(resolve(colName, _))
+
+    !metadataColumns.exists(col => isOutputColumn(col.name))
   }
 
   /** Notice that, the key is a relative path, not just the file name. */
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 849e4cc982..43fe3d71b2 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -384,4 +384,23 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
       Seq(Row(2, "2024-12-16"), Row(4, "2024-12-18"))
     )
   }
+
+  test("Paimon delete: delete with metadata column") {
+    spark.sql(s"""
+                 |CREATE TABLE T (
+                 |    id BIGINT,
+                 |    c1 STRING)
+                 |TBLPROPERTIES ('bucket' = '1', 'bucket-key' = 'id', 
'file.format'='avro')
+                 |""".stripMargin)
+    spark.sql("insert into table T values(1, 'a')")
+
+    val paths = spark.sql("SELECT __paimon_file_path FROM T").collect()
+    assert(paths.length == 1)
+
+    val path = paths(0).getString(0)
+    spark.sql(s"delete from T where __paimon_file_path = '$path'")
+
+    val paths2 = spark.sql("SELECT __paimon_file_path FROM T").collect()
+    assert(paths2.length == 0)
+  }
 }

Reply via email to