This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b35180fbc0 [spark] Optimize appendMetaColumns in PaimonCommand
b35180fbc0 is described below
commit b35180fbc0ef073ff4823babfd40ac4e1e9428d1
Author: JingsongLi <[email protected]>
AuthorDate: Fri Jul 11 12:02:08 2025 +0800
[spark] Optimize appendMetaColumns in PaimonCommand
---
.../paimon/spark/commands/PaimonCommand.scala | 41 +++++++---------------
1 file changed, 13 insertions(+), 28 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 43d428c533..12d0c66622 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -40,7 +40,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference,
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter =>
FilterLogicalNode, LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-import org.apache.spark.sql.sources._
import java.net.URI
import java.util.Collections
@@ -164,34 +163,20 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
assert(sparkTable.table.isInstanceOf[FileStoreTable])
val knownSplitsTable =
KnownSplitsTable.create(sparkTable.table.asInstanceOf[FileStoreTable],
splits.toArray)
- val metadataColumns =
-
sparkTable.metadataColumns.map(_.asInstanceOf[PaimonMetadataColumn].toAttribute)
-
- if (needAddMetadataColumns(metadataColumns, relation)) {
- // We re-plan the relation to skip analyze phase, so we should append all
- // metadata columns manually and let Spark do column pruning during
optimization.
- relation.copy(
- table = relation.table.asInstanceOf[SparkTable].copy(table =
knownSplitsTable),
- output = relation.output ++ metadataColumns
- )
- } else {
- // Only re-plan the relation with new table info.
- relation.copy(
- table = relation.table.asInstanceOf[SparkTable].copy(table =
knownSplitsTable)
- )
- }
- }
-
- /** Check whether relation output already contains paimon metadata columns.
*/
- private def needAddMetadataColumns(
- metadataColumns: Array[AttributeReference],
- relation: DataSourceV2Relation): Boolean = {
- val resolve = conf.resolver
val outputNames = relation.outputSet.map(_.name)
-
- def isOutputColumn(colName: String): Boolean =
outputNames.exists(resolve(colName, _))
-
- !metadataColumns.exists(col => isOutputColumn(col.name))
+ def isOutputColumn(colName: String) = {
+ val resolve = conf.resolver
+ outputNames.exists(resolve(colName, _))
+ }
+ val appendMetaColumns = sparkTable.metadataColumns
+ .map(_.asInstanceOf[PaimonMetadataColumn].toAttribute)
+ .filter(col => !isOutputColumn(col.name))
+ // We re-plan the relation to skip analyze phase, so we should append
needed
+ // metadata columns manually and let Spark do column pruning during
optimization.
+ relation.copy(
+ table = relation.table.asInstanceOf[SparkTable].copy(table =
knownSplitsTable),
+ output = relation.output ++ appendMetaColumns
+ )
}
/** Notice that, the key is a relative path, not just the file name. */