This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d64716936d [spark] Only read necessary meta columns when building new 
dataframe (#6084)
d64716936d is described below

commit d64716936d606d49fc8e52ef82bb9831b3cd81d8
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Aug 18 15:18:08 2025 +0800

    [spark] Only read necessary meta columns when building new dataframe (#6084)
---
 .../paimon/spark/catalyst/Compatibility.scala      | 11 +--
 .../paimon/spark/catalyst/Compatibility.scala      | 11 +--
 .../paimon/spark/procedure/CompactProcedure.java   |  9 +--
 .../org/apache/paimon/spark/PaimonSplitScan.scala  |  7 --
 .../paimon/spark/catalyst/Compatibility.scala      | 11 +--
 .../commands/DeleteFromPaimonTableCommand.scala    |  2 +-
 .../spark/commands/MergeIntoPaimonTable.scala      | 46 ++++++-----
 .../paimon/spark/commands/PaimonCommand.scala      | 92 +++++-----------------
 .../spark/commands/UpdatePaimonTableCommand.scala  | 26 +++---
 .../paimon/spark/schema/PaimonMetadataColumn.scala | 21 +++--
 .../apache/paimon/spark/util/ScanPlanHelper.scala  | 76 ++++++++++++++++++
 .../apache/paimon/spark/PaimonSparkTestBase.scala  |  2 +-
 .../spark/sql/PaimonOptimizationTestBase.scala     |  6 +-
 .../paimon/spark/util/ScanPlanHelperTest.scala     | 55 +++++++++++++
 .../scala/org/apache/spark/sql/paimon/Utils.scala  |  9 +--
 15 files changed, 213 insertions(+), 171 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b552bcd1e7..02e6efa6c6 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,23 +18,14 @@
 
 package org.apache.paimon.spark.catalyst
 
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
-import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 
 object Compatibility {
 
-  def createDataSourceV2ScanRelation(
-      relation: DataSourceV2Relation,
-      scan: Scan,
-      output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
-    DataSourceV2ScanRelation(relation, scan, output)
-  }
-
   def withNewQuery(o: V2WriteCommand, query: LogicalPlan): V2WriteCommand = {
     o.withNewQuery(query)
   }
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b552bcd1e7..02e6efa6c6 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,23 +18,14 @@
 
 package org.apache.paimon.spark.catalyst
 
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
-import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 
 object Compatibility {
 
-  def createDataSourceV2ScanRelation(
-      relation: DataSourceV2Relation,
-      scan: Scan,
-      output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
-    DataSourceV2ScanRelation(relation, scan, output)
-  }
-
   def withNewQuery(o: V2WriteCommand, query: LogicalPlan): V2WriteCommand = {
     o.withNewQuery(query)
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 8e299b5200..dd36e2f570 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -29,12 +29,11 @@ import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.spark.PaimonSplitScan;
 import org.apache.paimon.spark.SparkUtils;
-import org.apache.paimon.spark.catalyst.Compatibility;
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils;
 import org.apache.paimon.spark.commands.PaimonSparkWriter;
 import org.apache.paimon.spark.sort.TableSorter;
+import org.apache.paimon.spark.util.ScanPlanHelper$;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.SpecialFields;
@@ -508,10 +507,8 @@ public class CompactProcedure extends BaseProcedure {
                                     Dataset<Row> dataset =
                                             PaimonUtils.createDataset(
                                                     spark(),
-                                                    
Compatibility.createDataSourceV2ScanRelation(
-                                                            relation,
-                                                            
PaimonSplitScan.apply(table, split),
-                                                            
relation.output()));
+                                                    
ScanPlanHelper$.MODULE$.createNewScanPlan(
+                                                            split, relation));
                                     return sorter.sort(dataset);
                                 })
                         .reduce(Dataset::union)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index e0fac87518..91492de4e4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -59,10 +59,3 @@ case class PaimonSplitScan(
     s"PaimonSplitScan: [${table.name}]" + pushedFiltersStr
   }
 }
-
-object PaimonSplitScan {
-  def apply(table: InnerTable, dataSplits: Array[DataSplit]): PaimonSplitScan 
= {
-    val requiredSchema = SparkTypeUtils.fromPaimonRowType(table.rowType)
-    new PaimonSplitScan(table, dataSplits, requiredSchema, Seq.empty)
-  }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index 5f78cda21b..1fc260fc19 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,23 +18,14 @@
 
 package org.apache.paimon.spark.catalyst
 
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, 
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2WriteCommand}
 import org.apache.spark.sql.catalyst.trees.TreeNodeTag
-import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, 
DataSourceV2ScanRelation}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 
 object Compatibility {
 
-  def createDataSourceV2ScanRelation(
-      relation: DataSourceV2Relation,
-      scan: Scan,
-      output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
-    DataSourceV2ScanRelation(relation, scan, output)
-  }
-
   def withNewQuery(o: V2WriteCommand, query: LogicalPlan): V2WriteCommand = {
     o.withNewQuery(query)
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 55a387dd0f..b6045b5219 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -143,7 +143,7 @@ case class DeleteFromPaimonTableCommand(
 
       // Step3: the smallest range of data files that need to be rewritten.
       val (touchedFiles, newRelation) =
-        createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
+        extractFilesAndCreateNewScan(touchedFilePaths, dataFilePathToMeta, 
relation)
 
       // Step4: build a dataframe that contains the unchanged data, and write 
out them.
       val toRewriteScanRelation = Filter(Not(condition), newRelation)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index d54639310b..6b170a39b4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.{PaimonMetadataColumn, 
SparkSystemColumns}
-import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH, 
FILE_PATH_COLUMN, ROW_INDEX, ROW_INDEX_COLUMN}
+import org.apache.paimon.spark.schema.PaimonMetadataColumn._
 import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
 import org.apache.paimon.table.{FileStoreTable, SpecialFields}
 import org.apache.paimon.table.sink.CommitMessage
@@ -102,19 +102,14 @@ case class MergeIntoPaimonTable(
 
     if (deletionVectorsEnabled) {
       // Step2: generate dataset that should contains ROW_KIND, FILE_PATH, 
ROW_INDEX columns
-      val metadataCols = Seq(FILE_PATH, ROW_INDEX)
-      val filteredRelation = createDataset(
+      val filteredDf = createDataset(
         sparkSession,
-        createNewScanPlan(
-          candidateDataSplits,
-          targetOnlyCondition.getOrElse(TrueLiteral),
-          relation,
-          metadataCols))
+        createNewScanPlan(candidateDataSplits, relation, targetOnlyCondition))
       val ds = constructChangedRows(
         sparkSession,
-        filteredRelation,
+        selectWithDvMetaCols(filteredDf),
         remainDeletedRow = true,
-        extraMetadataCols = metadataCols)
+        extraMetadataCols = dvMetaCols)
 
       ds.cache()
       try {
@@ -168,24 +163,33 @@ case class MergeIntoPaimonTable(
         filePathsToRead --= noMatchedBySourceFilePaths
       }
 
-      val (filesToRewritten, touchedFileRelation) =
-        createNewRelation(filePathsToRewritten.toArray, dataFilePathToMeta, 
relation)
-      val (_, unTouchedFileRelation) =
-        createNewRelation(filePathsToRead.toArray, dataFilePathToMeta, 
relation)
+      val (filesToRewritten, filesToRewrittenScan) =
+        extractFilesAndCreateNewScan(filePathsToRewritten.toArray, 
dataFilePathToMeta, relation)
+      val (_, filesToReadScan) =
+        extractFilesAndCreateNewScan(filePathsToRead.toArray, 
dataFilePathToMeta, relation)
+
+      // If no files need to be rewritten, no need to write row lineage
+      val writeRowLineage = coreOptions.rowTrackingEnabled() && 
filesToRewritten.nonEmpty
 
       // Add FILE_TOUCHED_COL to mark the row as coming from the touched file, 
if the row has not been
       // modified and was from touched file, it should be kept too.
-      val targetDSWithFileTouchedCol = createDataset(sparkSession, 
touchedFileRelation)
-        .withColumn(FILE_TOUCHED_COL, lit(true))
-        .union(createDataset(sparkSession, unTouchedFileRelation)
-          .withColumn(FILE_TOUCHED_COL, lit(false)))
+      var filesToRewrittenDS =
+        createDataset(sparkSession, 
filesToRewrittenScan).withColumn(FILE_TOUCHED_COL, lit(true))
+      if (writeRowLineage) {
+        filesToRewrittenDS = selectWithRowLineageMetaCols(filesToRewrittenDS)
+      }
 
-      // If no files need to be rewritten, no need to write row lineage
-      val writeRowLineage = coreOptions.rowTrackingEnabled() && 
filesToRewritten.nonEmpty
+      var filesToReadDS =
+        createDataset(sparkSession, 
filesToReadScan).withColumn(FILE_TOUCHED_COL, lit(false))
+      if (writeRowLineage) {
+        // For filesToReadScan we don't need to read row lineage meta cols, 
just add placeholders
+        ROW_LINEAGE_META_COLUMNS.foreach(
+          c => filesToReadDS = filesToReadDS.withColumn(c, lit(null)))
+      }
 
       val toWriteDS = constructChangedRows(
         sparkSession,
-        targetDSWithFileTouchedCol,
+        filesToRewrittenDS.union(filesToReadDS),
         writeRowLineage = writeRowLineage).drop(ROW_KIND_COL)
 
       val writer = if (writeRowLineage) {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 4fe0678951..f628f31f16 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -23,12 +23,11 @@ import 
org.apache.paimon.deletionvectors.{Bitmap64DeletionVector, BitmapDeletion
 import org.apache.paimon.fs.Path
 import org.apache.paimon.index.IndexFileMeta
 import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, 
IndexIncrement}
-import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
-import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.schema.PaimonMetadataColumn._
-import org.apache.paimon.table.{BucketMode, InnerTable, KnownSplitsTable}
+import org.apache.paimon.spark.util.ScanPlanHelper
+import org.apache.paimon.table.BucketMode
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.utils.SerializationUtils
@@ -38,8 +37,9 @@ import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{Filter => 
FilterLogicalNode, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions.col
 
 import java.net.URI
 import java.util.Collections
@@ -47,7 +47,11 @@ import java.util.Collections
 import scala.collection.JavaConverters._
 
 /** Helper trait for all paimon commands. */
-trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with 
SQLConfHelper {
+trait PaimonCommand
+  extends WithFileStoreTable
+  with ExpressionHelper
+  with ScanPlanHelper
+  with SQLConfHelper {
 
   lazy val dvSafeWriter: PaimonSparkWriter = {
     if (table.primaryKeys().isEmpty && table.bucketMode() == 
BucketMode.HASH_FIXED) {
@@ -102,7 +106,7 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
       }
     }
 
-    val filteredRelation = createNewScanPlan(candidateDataSplits, condition, 
relation)
+    val filteredRelation = createNewScanPlan(candidateDataSplits, relation, 
Some(condition))
     findTouchedFiles(createDataset(sparkSession, filteredRelation), 
sparkSession)
   }
 
@@ -119,65 +123,18 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
       .map(relativePath)
   }
 
-  protected def createNewScanPlan(
-      candidateDataSplits: Seq[DataSplit],
-      condition: Expression,
-      relation: DataSourceV2Relation,
-      metadataColumns: Seq[PaimonMetadataColumn]): LogicalPlan = {
-    val newRelation = createNewScanPlan(candidateDataSplits, condition, 
relation)
-    val resolvedMetadataColumns = metadataColumns.map {
-      col =>
-        val attr = newRelation.resolve(col.name :: Nil, conf.resolver)
-        assert(attr.isDefined)
-        attr.get
-    }
-    Project(relation.output ++ resolvedMetadataColumns, newRelation)
-  }
-
-  protected def createNewScanPlan(
-      candidateDataSplits: Seq[DataSplit],
-      condition: Expression,
-      relation: DataSourceV2Relation): LogicalPlan = {
-    val newRelation = createNewRelation(candidateDataSplits, relation)
-    FilterLogicalNode(condition, newRelation)
-  }
-
-  protected def createNewRelation(
+  protected def extractFilesAndCreateNewScan(
       filePaths: Array[String],
       filePathToMeta: Map[String, SparkDataFileMeta],
-      relation: DataSourceV2Relation): (Array[SparkDataFileMeta], 
DataSourceV2Relation) = {
+      relation: DataSourceV2Relation): (Array[SparkDataFileMeta], LogicalPlan) 
= {
     val files = filePaths.map(
       file => filePathToMeta.getOrElse(file, throw new 
RuntimeException(s"Missing file: $file")))
     val touchedDataSplits =
       SparkDataFileMeta.convertToDataSplits(files, rawConvertible = true, 
fileStore.pathFactory())
-    val newRelation = createNewRelation(touchedDataSplits, relation)
+    val newRelation = createNewScanPlan(touchedDataSplits, relation)
     (files, newRelation)
   }
 
-  protected def createNewRelation(
-      splits: Seq[DataSplit],
-      relation: DataSourceV2Relation): DataSourceV2Relation = {
-    assert(relation.table.isInstanceOf[SparkTable])
-    val sparkTable = relation.table.asInstanceOf[SparkTable]
-    assert(sparkTable.table.isInstanceOf[InnerTable])
-    val knownSplitsTable =
-      KnownSplitsTable.create(sparkTable.table.asInstanceOf[InnerTable], 
splits.toArray)
-    val outputNames = relation.outputSet.map(_.name)
-    def isOutputColumn(colName: String) = {
-      val resolve = conf.resolver
-      outputNames.exists(resolve(colName, _))
-    }
-    val appendMetaColumns = sparkTable.metadataColumns
-      .map(_.asInstanceOf[PaimonMetadataColumn].toAttribute)
-      .filter(col => !isOutputColumn(col.name))
-    // We re-plan the relation to skip analyze phase, so we should append 
needed
-    // metadata columns manually and let Spark do column pruning during 
optimization.
-    relation.copy(
-      table = relation.table.asInstanceOf[SparkTable].copy(table = 
knownSplitsTable),
-      output = relation.output ++ appendMetaColumns
-    )
-  }
-
   /** Notice that, the key is a relative path, not just the file name. */
   protected def candidateFileMap(
       candidateDataSplits: Seq[DataSplit]): Map[String, SparkDataFileMeta] = {
@@ -196,34 +153,25 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
       condition: Expression,
       relation: DataSourceV2Relation,
       sparkSession: SparkSession): Dataset[SparkDeletionVector] = {
-    val filteredRelation = createNewScanPlan(candidateDataSplits, condition, 
relation)
-    val dataWithMetadataColumns = createDataset(sparkSession, filteredRelation)
-    collectDeletionVectors(dataFilePathToMeta, dataWithMetadataColumns, 
sparkSession)
+    val filteredRelation =
+      createNewScanPlan(candidateDataSplits, relation, Some(condition))
+    val dataset = createDataset(sparkSession, filteredRelation)
+    collectDeletionVectors(dataFilePathToMeta, dataset, sparkSession)
   }
 
   protected def collectDeletionVectors(
       dataFilePathToMeta: Map[String, SparkDataFileMeta],
-      dataWithMetadataColumns: Dataset[Row],
+      dataset: Dataset[Row],
       sparkSession: SparkSession): Dataset[SparkDeletionVector] = {
     import sparkSession.implicits._
-
-    val resolver = sparkSession.sessionState.conf.resolver
-    Seq(FILE_PATH_COLUMN, ROW_INDEX_COLUMN).foreach {
-      metadata =>
-        dataWithMetadataColumns.schema
-          .find(field => resolver(field.name, metadata))
-          .orElse(throw new RuntimeException(
-            "This input dataset doesn't contains the required metadata 
columns: __paimon_file_path and __paimon_row_index."))
-    }
-
     val dataFileToPartitionAndBucket =
       dataFilePathToMeta.mapValues(meta => (meta.partition, 
meta.bucket)).toArray
 
     val my_table = table
     val location = my_table.location
     val dvBitmap64 = my_table.coreOptions().deletionVectorBitmap64()
-    dataWithMetadataColumns
-      .select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
+    dataset
+      .select(DV_META_COLUMNS.map(col): _*)
       .as[(String, Long)]
       .groupByKey(_._1)
       .mapGroups {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index e7d43aa936..6b47cd74f8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.commands
 
 import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
 import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{ROW_ID_COLUMN, 
SEQUENCE_NUMBER_COLUMN}
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.{FileStoreTable, SpecialFields}
 import org.apache.paimon.table.sink.CommitMessage
@@ -30,7 +31,7 @@ import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If, 
Literal}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, 
Project, SupportsSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, 
LogicalPlan, Project, SupportsSubquery}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.{col, lit}
 
@@ -77,7 +78,7 @@ case class UpdatePaimonTableCommand(
     val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
 
     if (candidateDataSplits.isEmpty) {
-      // no data spilt need to be rewrote
+      // no data spilt need to be rewritten
       logDebug("No file need to rewrote. It's an empty Commit.")
       Seq.empty[CommitMessage]
     } else {
@@ -114,12 +115,12 @@ case class UpdatePaimonTableCommand(
 
         // Step3: the smallest range of data files that need to be rewritten.
         val (touchedFiles, touchedFileRelation) =
-          createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
+          extractFilesAndCreateNewScan(touchedFilePaths, dataFilePathToMeta, 
relation)
 
         // Step4: build a dataframe that contains the unchanged and updated 
data, and write out them.
         val addCommitMessage = writeUpdatedAndUnchangedData(sparkSession, 
touchedFileRelation)
 
-        // Step5: convert the deleted files that need to be wrote to commit 
message.
+        // Step5: convert the deleted files that need to be written to commit 
message.
         val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
 
         addCommitMessage ++ deletedCommitMessage
@@ -135,19 +136,14 @@ case class UpdatePaimonTableCommand(
         toColumn(update).as(origin.name, origin.metadata)
     }
 
-    val toUpdateScanRelation = createNewRelation(touchedDataSplits, relation)
-    val newPlan = if (condition == TrueLiteral) {
-      toUpdateScanRelation
-    } else {
-      Filter(condition, toUpdateScanRelation)
-    }
-    val data = createDataset(sparkSession, newPlan).select(updateColumns: _*)
+    val toUpdateScanRelation = createNewScanPlan(touchedDataSplits, relation, 
Some(condition))
+    val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
     dvSafeWriter.write(data)
   }
 
   private def writeUpdatedAndUnchangedData(
       sparkSession: SparkSession,
-      toUpdateScanRelation: DataSourceV2Relation): Seq[CommitMessage] = {
+      toUpdateScanRelation: LogicalPlan): Seq[CommitMessage] = {
     var updateColumns = updateExpressions.zip(relation.output).map {
       case (update, origin) =>
         val updated = optimizedIf(condition, update, origin)
@@ -156,13 +152,13 @@ case class UpdatePaimonTableCommand(
 
     if (coreOptions.rowTrackingEnabled()) {
       updateColumns ++= Seq(
-        col(SpecialFields.ROW_ID.name()),
+        col(ROW_ID_COLUMN),
         toColumn(
           optimizedIf(
             condition,
             Literal(null),
-            toExpression(sparkSession, 
col(SpecialFields.SEQUENCE_NUMBER.name()))))
-          .as(SpecialFields.SEQUENCE_NUMBER.name())
+            toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
+          .as(SEQUENCE_NUMBER_COLUMN)
       )
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 4f3248c0ff..4bf6b6c743 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -48,14 +48,19 @@ object PaimonMetadataColumn {
   val FILE_PATH_COLUMN = "__paimon_file_path"
   val PARTITION_COLUMN = "__paimon_partition"
   val BUCKET_COLUMN = "__paimon_bucket"
+  val ROW_ID_COLUMN = SpecialFields.ROW_ID.name()
+  val SEQUENCE_NUMBER_COLUMN = SpecialFields.SEQUENCE_NUMBER.name()
+
+  val DV_META_COLUMNS: Seq[String] = Seq(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
+  val ROW_LINEAGE_META_COLUMNS: Seq[String] = Seq(ROW_ID_COLUMN, 
SEQUENCE_NUMBER_COLUMN)
 
   val SUPPORTED_METADATA_COLUMNS: Seq[String] = Seq(
     ROW_INDEX_COLUMN,
     FILE_PATH_COLUMN,
     PARTITION_COLUMN,
     BUCKET_COLUMN,
-    SpecialFields.ROW_ID.name(),
-    SpecialFields.SEQUENCE_NUMBER.name()
+    ROW_ID_COLUMN,
+    SEQUENCE_NUMBER_COLUMN
   )
 
   val ROW_INDEX: PaimonMetadataColumn =
@@ -68,9 +73,13 @@ object PaimonMetadataColumn {
   val BUCKET: PaimonMetadataColumn =
     PaimonMetadataColumn(Int.MaxValue - 103, BUCKET_COLUMN, IntegerType)
   val ROW_ID: PaimonMetadataColumn =
-    PaimonMetadataColumn(Int.MaxValue - 104, SpecialFields.ROW_ID.name(), 
LongType)
+    PaimonMetadataColumn(Int.MaxValue - 104, ROW_ID_COLUMN, LongType)
   val SEQUENCE_NUMBER: PaimonMetadataColumn =
-    PaimonMetadataColumn(Int.MaxValue - 105, 
SpecialFields.SEQUENCE_NUMBER.name(), LongType)
+    PaimonMetadataColumn(Int.MaxValue - 105, SEQUENCE_NUMBER_COLUMN, LongType)
+
+  def dvMetaCols: Seq[PaimonMetadataColumn] = Seq(FILE_PATH, ROW_INDEX)
+
+  def rowLineageMetaCols: Seq[PaimonMetadataColumn] = Seq(ROW_ID, 
SEQUENCE_NUMBER)
 
   def get(metadataColumn: String, partitionType: StructType): 
PaimonMetadataColumn = {
     metadataColumn match {
@@ -78,8 +87,8 @@ object PaimonMetadataColumn {
       case FILE_PATH_COLUMN => FILE_PATH
       case PARTITION_COLUMN => PARTITION(partitionType)
       case BUCKET_COLUMN => BUCKET
-      case ROW_ID.name => ROW_ID
-      case SEQUENCE_NUMBER.name => SEQUENCE_NUMBER
+      case ROW_ID_COLUMN => ROW_ID
+      case SEQUENCE_NUMBER_COLUMN => SEQUENCE_NUMBER
       case _ =>
         throw new IllegalArgumentException(s"$metadataColumn metadata column 
is not supported.")
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
new file mode 100644
index 0000000000..437a563ca8
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{DV_META_COLUMNS, 
ROW_LINEAGE_META_COLUMNS}
+import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
+import org.apache.paimon.table.source.DataSplit
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions.col
+
+trait ScanPlanHelper extends SQLConfHelper {
+
+  /** Create a new scan plan from a relation with the given data splits, 
condition(optional). */
+  def createNewScanPlan(
+      dataSplits: Seq[DataSplit],
+      relation: DataSourceV2Relation,
+      condition: Option[Expression] = None): LogicalPlan = {
+    val newRelation = relation.table match {
+      case sparkTable @ SparkTable(table: InnerTable) =>
+        val knownSplitsTable = KnownSplitsTable.create(table, 
dataSplits.toArray)
+        relation.copy(table = sparkTable.copy(table = knownSplitsTable))
+      case _ => throw new RuntimeException()
+    }
+
+    condition match {
+      case Some(c) if c != TrueLiteral => Filter(c, newRelation)
+      case _ => newRelation
+    }
+  }
+
+  def selectWithDvMetaCols(data: DataFrame): DataFrame = {
+    selectWithAdditionalCols(data, DV_META_COLUMNS)
+  }
+
+  def selectWithRowLineageMetaCols(data: DataFrame): DataFrame = {
+    selectWithAdditionalCols(data, ROW_LINEAGE_META_COLUMNS)
+  }
+
+  private def selectWithAdditionalCols(data: DataFrame, additionalCols: 
Seq[String]): DataFrame = {
+    val dataColNames = data.schema.names
+    val mergedColNames = dataColNames ++ 
additionalCols.filterNot(dataColNames.contains)
+    data.select(mergedColNames.map(col): _*)
+  }
+}
+
+/** This wrapper is only used in java code, e.g. Procedure. */
+object ScanPlanHelper extends ScanPlanHelper {
+  def createNewScanPlan(
+      dataSplits: Array[DataSplit],
+      relation: DataSourceV2Relation): LogicalPlan = {
+    ScanPlanHelper.createNewScanPlan(dataSplits.toSeq, relation)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index f3c2121860..013395d7c2 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -175,7 +175,7 @@ class PaimonSparkTestBase
     paimonCatalog.getTable(Identifier.create(dbName, 
tableName)).asInstanceOf[FileStoreTable]
   }
 
-  protected def createRelationV2(tableName: String): LogicalPlan = {
+  protected def createRelationV2(tableName: String): DataSourceV2Relation = {
     val sparkTable = SparkTable(loadTable(tableName))
     DataSourceV2Relation.create(
       sparkTable,
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index e5f1d0e131..925344d49d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -23,13 +23,11 @@ import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueries
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{PaimonUtils, Row}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
CreateNamedStruct, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef, 
LogicalPlan, OneRowRelation, WithCTE}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.functions._
-import org.apache.spark.sql.paimon.Utils
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
 import org.junit.jupiter.api.Assertions
 
 import scala.collection.immutable
@@ -62,7 +60,7 @@ abstract class PaimonOptimizationTestBase extends 
PaimonSparkTestBase with Expre
                                |""".stripMargin)
       val optimizedPlan = Optimize.execute(query.queryExecution.analyzed)
 
-      val df = Utils.createDataFrame(spark, createRelationV2("T"))
+      val df = PaimonUtils.createDataset(spark, createRelationV2("T"))
       val mergedSubquery = df
         .select(
           toColumn(count(Literal(1))).as("cnt"),
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
new file mode 100644
index 0000000000..007b95e10b
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.source.DataSplit
+
+import org.apache.spark.sql.PaimonUtils.createDataset
+import org.apache.spark.sql.Row
+
+class ScanPlanHelperTest extends PaimonSparkTestBase with ScanPlanHelper {
+
+  test("ScanPlanHelper: create new scan plan and select with row lineage meta 
cols") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
+
+      val splits = getPaimonScan("SELECT * FROM 
t").getOriginSplits.map(_.asInstanceOf[DataSplit])
+      val newScanPlan = createNewScanPlan(splits, createRelationV2("t"))
+      val newDf = createDataset(spark, newScanPlan)
+
+      // select original df should not contain meta cols
+      checkAnswer(newDf, Seq(Row(11, "a"), Row(22, "b")))
+
+      // select df with row lineage meta cols
+      checkAnswer(selectWithRowLineageMetaCols(newDf), Seq(Row(11, "a", 0, 1), 
Row(22, "b", 1, 1)))
+
+      // select with row lineage meta cols twice should not add new more meta 
cols
+      checkAnswer(
+        selectWithRowLineageMetaCols(selectWithRowLineageMetaCols(newDf)),
+        Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1)))
+
+      // select df already contains meta cols with row lineage
+      checkAnswer(
+        selectWithRowLineageMetaCols(newDf.select("_ROW_ID", "id")),
+        Seq(Row(0, 11, 1), Row(1, 22, 1)))
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
index 61a479b9f0..b5700ea8ef 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
@@ -18,9 +18,7 @@
 
 package org.apache.spark.sql.paimon
 
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.util.{Utils => SparkUtils}
 
 import java.io.File
@@ -35,9 +33,4 @@ object Utils {
   def waitUntilEventEmpty(spark: SparkSession): Unit = {
     spark.sparkContext.listenerBus.waitUntilEmpty()
   }
-
-  def createDataFrame(sparkSession: SparkSession, plan: LogicalPlan): 
DataFrame = {
-    SparkShimLoader.shim.classicApi.createDataset(sparkSession, plan)
-  }
-
 }


Reply via email to