This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b35180fbc0 [spark] Optimize appendMetaColumns in PaimonCommand
b35180fbc0 is described below

commit b35180fbc0ef073ff4823babfd40ac4e1e9428d1
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jul 11 12:02:08 2025 +0800

    [spark] Optimize appendMetaColumns in PaimonCommand
---
 .../paimon/spark/commands/PaimonCommand.scala      | 41 +++++++---------------
 1 file changed, 13 insertions(+), 28 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 43d428c533..12d0c66622 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference,
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{Filter => 
FilterLogicalNode, LogicalPlan, Project}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.sources._
 
 import java.net.URI
 import java.util.Collections
@@ -164,34 +163,20 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
     assert(sparkTable.table.isInstanceOf[FileStoreTable])
     val knownSplitsTable =
       KnownSplitsTable.create(sparkTable.table.asInstanceOf[FileStoreTable], 
splits.toArray)
-    val metadataColumns =
-      
sparkTable.metadataColumns.map(_.asInstanceOf[PaimonMetadataColumn].toAttribute)
-
-    if (needAddMetadataColumns(metadataColumns, relation)) {
-      // We re-plan the relation to skip analyze phase, so we should append all
-      // metadata columns manually and let Spark do column pruning during 
optimization.
-      relation.copy(
-        table = relation.table.asInstanceOf[SparkTable].copy(table = 
knownSplitsTable),
-        output = relation.output ++ metadataColumns
-      )
-    } else {
-      // Only re-plan the relation with new table info.
-      relation.copy(
-        table = relation.table.asInstanceOf[SparkTable].copy(table = 
knownSplitsTable)
-      )
-    }
-  }
-
-  /** Check whether relation output already contains paimon metadata columns. 
*/
-  private def needAddMetadataColumns(
-      metadataColumns: Array[AttributeReference],
-      relation: DataSourceV2Relation): Boolean = {
-    val resolve = conf.resolver
     val outputNames = relation.outputSet.map(_.name)
-
-    def isOutputColumn(colName: String): Boolean = 
outputNames.exists(resolve(colName, _))
-
-    !metadataColumns.exists(col => isOutputColumn(col.name))
+    def isOutputColumn(colName: String) = {
+      val resolve = conf.resolver
+      outputNames.exists(resolve(colName, _))
+    }
+    val appendMetaColumns = sparkTable.metadataColumns
+      .map(_.asInstanceOf[PaimonMetadataColumn].toAttribute)
+      .filter(col => !isOutputColumn(col.name))
+    // We re-plan the relation to skip analyze phase, so we should append 
needed
+    // metadata columns manually and let Spark do column pruning during 
optimization.
+    relation.copy(
+      table = relation.table.asInstanceOf[SparkTable].copy(table = 
knownSplitsTable),
+      output = relation.output ++ appendMetaColumns
+    )
   }
 
   /** Notice that, the key is a relative path, not just the file name. */

Reply via email to