This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 047856e33e [core][spark] Support reading row lineage metadata column 
(#5985)
047856e33e is described below

commit 047856e33e069777e5cbb599b71154da83ffa9eb
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Aug 1 10:11:14 2025 +0800

    [core][spark] Support reading row lineage metadata column (#5985)
---
 docs/content/spark/sql-query.md                    |  2 ++
 .../apache/paimon/table/source/ReadBuilder.java    |  3 +-
 .../paimon/table/source/ReadBuilderImpl.java       |  6 ----
 .../scala/org/apache/paimon/spark/PaimonScan.scala |  4 +--
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  4 +--
 .../scala/org/apache/paimon/spark/PaimonScan.scala |  4 +--
 .../paimon/spark/ColumnPruningAndPushDown.scala    | 15 ++++++--
 .../org/apache/paimon/spark/PaimonBaseScan.scala   | 34 +++++++-----------
 .../paimon/spark/PaimonBaseScanBuilder.scala       |  4 +--
 .../paimon/spark/PaimonRecordReaderIterator.scala  | 16 +++++----
 .../scala/org/apache/paimon/spark/PaimonScan.scala |  4 +--
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  4 +--
 .../org/apache/paimon/spark/PaimonSplitScan.scala  |  6 ++--
 .../scala/org/apache/paimon/spark/SparkTable.scala | 22 +++++++++---
 .../paimon/spark/aggregate/LocalAggregator.scala   |  7 +---
 .../paimon/spark/commands/PaimonCommand.scala      |  8 ++---
 .../paimon/spark/schema/PaimonMetadataColumn.scala | 12 ++++++-
 .../apache/paimon/spark/sql/RowLineageTest.scala   | 42 ++++++++++++++++++++++
 18 files changed, 127 insertions(+), 70 deletions(-)

diff --git a/docs/content/spark/sql-query.md b/docs/content/spark/sql-query.md
index 50718ff6ae..62a048bd9a 100644
--- a/docs/content/spark/sql-query.md
+++ b/docs/content/spark/sql-query.md
@@ -43,6 +43,8 @@ Paimon also supports reading some hidden metadata columns, 
currently supporting
 - `__paimon_partition`: the partition of the record.
 - `__paimon_bucket`: the bucket of the record.
 - `__paimon_row_index`: the row index of the record.
+- `_ROW_ID`: the unique row id of the record (valid only when 
`row-tracking.enabled` is set to true).
+- `_SEQUENCE_NUMBER`: the sequence number of the record (valid only when 
`row-tracking.enabled` is set to true).
 
 ```sql
 -- read all columns and the corresponding file path, partition, bucket, 
rowIndex of the record
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 1f3d52b63c..e544182aa7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.Table;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
 
@@ -119,7 +118,7 @@ public interface ReadBuilder extends Serializable {
     /**
      * Push read row type to the reader, support nested row pruning.
      *
-     * @param readType read row type, can be a pruned type from {@link 
Table#rowType()}
+     * @param readType read row type
      * @since 1.0.0
      */
     ReadBuilder withReadType(RowType readType);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 63be2e8765..3f8765f303 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -110,12 +110,6 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     @Override
     public ReadBuilder withReadType(RowType readType) {
-        RowType tableRowType = table.rowType();
-        checkState(
-                readType.isPrunedFrom(tableRowType),
-                "read row type must be a pruned type from table row type, read 
row type: %s, table row type: %s",
-                readType,
-                tableRowType);
         this.readType = readType;
         return this;
     }
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index d3a3c360ee..67d91652b8 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.Table
+import org.apache.paimon.table.InnerTable
 
 import org.apache.spark.sql.PaimonUtils.fieldReference
 import org.apache.spark.sql.connector.expressions.NamedReference
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types.StructType
 import scala.collection.JavaConverters._
 
 case class PaimonScan(
-    table: Table,
+    table: InnerTable,
     requiredSchema: StructType,
     filters: Seq[Predicate],
     reservedFilters: Seq[Filter],
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 395f8707ab..8c2c53163d 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -19,14 +19,14 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
-import org.apache.paimon.table.Table
+import org.apache.paimon.table.InnerTable
 
 import org.apache.spark.sql.connector.read.SupportsPushDownFilters
 import org.apache.spark.sql.sources.Filter
 
 import scala.collection.mutable
 
-class PaimonScanBuilder(table: Table)
+class PaimonScanBuilder(table: InnerTable)
   extends PaimonBaseScanBuilder(table)
   with SupportsPushDownFilters {
 
diff --git 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index ec589442e8..0c356d9b13 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
 
 import org.apache.spark.sql.PaimonUtils.fieldReference
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType
 import scala.collection.JavaConverters._
 
 case class PaimonScan(
-    table: Table,
+    table: InnerTable,
     requiredSchema: StructType,
     filters: Seq[Predicate],
     reservedFilters: Seq[Filter],
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index f29c146b77..c64565d16a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -18,9 +18,10 @@
 
 package org.apache.paimon.spark
 
+import org.apache.paimon.CoreOptions
 import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.Table
+import org.apache.paimon.table.{InnerTable, SpecialFields}
 import org.apache.paimon.table.source.ReadBuilder
 import org.apache.paimon.types.RowType
 import org.apache.paimon.utils.Preconditions.checkState
@@ -30,12 +31,20 @@ import org.apache.spark.sql.connector.read.Scan
 import org.apache.spark.sql.types.StructType
 
 trait ColumnPruningAndPushDown extends Scan with Logging {
-  def table: Table
+  def table: InnerTable
   def requiredSchema: StructType
   def filters: Seq[Predicate]
   def pushDownLimit: Option[Int] = None
 
-  lazy val tableRowType: RowType = table.rowType
+  lazy val tableRowType: RowType = {
+    val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
+    if (coreOptions.rowTrackingEnabled()) {
+      SpecialFields.rowTypeWithRowLineage(table.rowType())
+    } else {
+      table.rowType()
+    }
+  }
+
   lazy val tableSchema: StructType = 
SparkTypeUtils.fromPaimonRowType(tableRowType)
 
   final def partitionType: StructType = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index b0447c8830..90be4027f8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.predicate.Predicate
 import org.apache.paimon.spark.metric.SparkMetricRegistry
 import org.apache.paimon.spark.sources.PaimonMicroBatchStream
 import org.apache.paimon.spark.statistics.StatisticsHelper
-import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
+import org.apache.paimon.table.{DataTable, InnerTable}
 import org.apache.paimon.table.source.{InnerTableScan, Split}
 
 import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
@@ -38,7 +38,7 @@ import java.util.Optional
 import scala.collection.JavaConverters._
 
 abstract class PaimonBaseScan(
-    table: Table,
+    table: InnerTable,
     requiredSchema: StructType,
     filters: Seq[Predicate],
     reservedFilters: Seq[Filter],
@@ -106,30 +106,20 @@ abstract class PaimonBaseScan(
   }
 
   override def supportedCustomMetrics: Array[CustomMetric] = {
-    val paimonMetrics: Array[CustomMetric] = table match {
-      case _: FileStoreTable =>
-        Array(
-          PaimonNumSplitMetric(),
-          PaimonSplitSizeMetric(),
-          PaimonAvgSplitSizeMetric(),
-          PaimonPlanningDurationMetric(),
-          PaimonScannedManifestsMetric(),
-          PaimonSkippedTableFilesMetric(),
-          PaimonResultedTableFilesMetric()
-        )
-      case _ =>
-        Array.empty[CustomMetric]
-    }
+    val paimonMetrics = Array(
+      PaimonNumSplitMetric(),
+      PaimonSplitSizeMetric(),
+      PaimonAvgSplitSizeMetric(),
+      PaimonPlanningDurationMetric(),
+      PaimonScannedManifestsMetric(),
+      PaimonSkippedTableFilesMetric(),
+      PaimonResultedTableFilesMetric()
+    )
     super.supportedCustomMetrics() ++ paimonMetrics
   }
 
   override def reportDriverMetrics(): Array[CustomTaskMetric] = {
-    table match {
-      case _: FileStoreTable =>
-        paimonMetricsRegistry.buildSparkScanMetrics()
-      case _ =>
-        Array.empty[CustomTaskMetric]
-    }
+    paimonMetricsRegistry.buildSparkScanMetrics()
   }
 
   override def description(): String = {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 1db1784484..fe1e58a756 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -19,14 +19,14 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.Table
+import org.apache.paimon.table.InnerTable
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownRequiredColumns}
 import org.apache.spark.sql.sources.Filter
 import org.apache.spark.sql.types.StructType
 
-abstract class PaimonBaseScanBuilder(table: Table)
+abstract class PaimonBaseScanBuilder(table: InnerTable)
   extends ScanBuilder
   with SupportsPushDownRequiredColumns
   with Logging {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
index 4a71cdff88..0c5f08d6e5 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
@@ -35,17 +35,17 @@ case class PaimonRecordReaderIterator(
     split: Split)
   extends CloseableIterator[PaimonInternalRow] {
 
+  private val needMetadata = metadataColumns.nonEmpty
+  private val metadataRow: GenericRow =
+    GenericRow.of(Array.fill(metadataColumns.size)(null.asInstanceOf[AnyRef]): 
_*)
+  private val joinedRow: JoinedRow = JoinedRow.join(null, metadataRow)
+
   private var lastFilePath: Path = _
   private var isFileRecordIterator: Boolean = false
   private var currentIterator: RecordReader.RecordIterator[PaimonInternalRow] 
= readBatch()
   private var advanced = false
   private var currentResult: PaimonInternalRow = _
 
-  private val needMetadata = metadataColumns.nonEmpty
-  private val metadataRow: GenericRow =
-    GenericRow.of(Array.fill(metadataColumns.size)(null.asInstanceOf[AnyRef]): 
_*)
-  private val joinedRow: JoinedRow = JoinedRow.join(null, metadataRow)
-
   private def validateMetadataColumns(): Unit = {
     if (needMetadata) {
       if (!isFileRecordIterator || !split.isInstanceOf[DataSplit]) {
@@ -98,6 +98,11 @@ case class PaimonRecordReaderIterator(
       case _ =>
         isFileRecordIterator = false
     }
+
+    if (iter != null) {
+      validateMetadataColumns()
+    }
+
     iter
   }
 
@@ -110,7 +115,6 @@ case class PaimonRecordReaderIterator(
           val dataRow = currentIterator.next()
           if (dataRow != null) {
             if (needMetadata) {
-              validateMetadataColumns()
               
updateMetadataRow(currentIterator.asInstanceOf[FileRecordIterator[PaimonInternalRow]])
               currentResult = joinedRow.replace(dataRow, metadataRow)
             } else {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index b2c9ae67aa..eef1787d99 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark
 import org.apache.paimon.CoreOptions.BucketFunctionType
 import org.apache.paimon.predicate.Predicate
 import org.apache.paimon.spark.commands.BucketExpression.quote
-import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable, Table}
 import org.apache.paimon.table.source.{DataSplit, Split}
 
 import org.apache.spark.sql.PaimonUtils.fieldReference
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types.StructType
 import scala.collection.JavaConverters._
 
 case class PaimonScan(
-    table: Table,
+    table: InnerTable,
     requiredSchema: StructType,
     filters: Seq[Predicate],
     reservedFilters: Seq[Filter],
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 5fe1737c0d..10910da298 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, 
PredicateBuilder}
 import org.apache.paimon.spark.aggregate.{AggregatePushDownUtils, 
LocalAggregator}
-import org.apache.paimon.table.{FileStoreTable, Table}
+import org.apache.paimon.table.{FileStoreTable, InnerTable, Table}
 import org.apache.paimon.table.source.DataSplit
 
 import org.apache.spark.sql.PaimonUtils
@@ -32,7 +32,7 @@ import org.apache.spark.sql.sources.Filter
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-class PaimonScanBuilder(table: Table)
+class PaimonScanBuilder(table: InnerTable)
   extends PaimonBaseScanBuilder(table)
   with SupportsPushDownV2Filters
   with SupportsPushDownLimit
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 7d0bf83155..e0fac87518 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.{KnownSplitsTable, Table}
+import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
 
 import org.apache.spark.sql.connector.read.{Batch, Scan}
@@ -34,7 +34,7 @@ class PaimonSplitScanBuilder(table: KnownSplitsTable) extends 
PaimonScanBuilder(
 
 /** For internal use only. */
 case class PaimonSplitScan(
-    table: Table,
+    table: InnerTable,
     dataSplits: Array[DataSplit],
     requiredSchema: StructType,
     filters: Seq[Predicate])
@@ -61,7 +61,7 @@ case class PaimonSplitScan(
 }
 
 object PaimonSplitScan {
-  def apply(table: Table, dataSplits: Array[DataSplit]): PaimonSplitScan = {
+  def apply(table: InnerTable, dataSplits: Array[DataSplit]): PaimonSplitScan 
= {
     val requiredSchema = SparkTypeUtils.fromPaimonRowType(table.rowType)
     new PaimonSplitScan(table, dataSplits, requiredSchema, Seq.empty)
   }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index a3fac1db79..305a7191d8 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -25,11 +25,11 @@ import 
org.apache.paimon.spark.catalog.functions.BucketFunction
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.util.OptionUtils
 import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
-import org.apache.paimon.table.{BucketMode, DataTable, FileStoreTable, 
KnownSplitsTable, Table}
+import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable, 
KnownSplitsTable, Table}
 import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED, 
POSTPONE_MODE}
 import org.apache.paimon.utils.StringUtils
 
-import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability, 
TableCatalog}
+import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
 import org.apache.spark.sql.connector.read.ScanBuilder
 import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
@@ -39,6 +39,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map 
=> JMap, Set => JSet}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
 /** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
 case class SparkTable(table: Table)
@@ -48,13 +49,14 @@ case class SparkTable(table: Table)
   with SupportsMetadataColumns
   with PaimonPartitionManagement {
 
+  lazy val coreOptions = new CoreOptions(table.options())
+
   private lazy val useV2Write: Boolean = {
     val v2WriteConfigured = OptionUtils.useV2Write()
     v2WriteConfigured && supportsV2Write
   }
 
   private def supportsV2Write: Boolean = {
-    val coreOptions = new CoreOptions(table.options())
     coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
       table match {
         case storeTable: FileStoreTable =>
@@ -118,20 +120,30 @@ case class SparkTable(table: Table)
 
   override def metadataColumns: Array[MetadataColumn] = {
     val partitionType = SparkTypeUtils.toSparkPartitionType(table)
-    Array[MetadataColumn](
+
+    val _metadataColumns = ArrayBuffer[MetadataColumn](
       PaimonMetadataColumn.FILE_PATH,
       PaimonMetadataColumn.ROW_INDEX,
       PaimonMetadataColumn.PARTITION(partitionType),
       PaimonMetadataColumn.BUCKET
     )
+
+    if (coreOptions.rowTrackingEnabled()) {
+      _metadataColumns.append(PaimonMetadataColumn.ROW_ID)
+      _metadataColumns.append(PaimonMetadataColumn.SEQUENCE_NUMBER)
+    }
+
+    _metadataColumns.toArray
   }
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= {
     table match {
       case t: KnownSplitsTable =>
         new PaimonSplitScanBuilder(t)
+      case _: InnerTable =>
+        new 
PaimonScanBuilder(table.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable])
       case _ =>
-        new PaimonScanBuilder(table.copy(options.asCaseSensitiveMap))
+        throw new RuntimeException("Only InnerTable can be scanned.")
     }
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
index bb88aa669e..fb401c78e1 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
@@ -18,9 +18,7 @@
 
 package org.apache.paimon.spark.aggregate
 
-import org.apache.paimon.CoreOptions
 import org.apache.paimon.data.BinaryRow
-import org.apache.paimon.schema.SchemaManager
 import org.apache.paimon.spark.SparkTypeUtils
 import org.apache.paimon.spark.data.SparkInternalRow
 import org.apache.paimon.stats.SimpleStatsEvolutions
@@ -46,10 +44,7 @@ class LocalAggregator(table: FileStoreTable) {
   private var aggFuncEvaluatorGetter: () => Seq[AggFuncEvaluator[_]] = _
   private var isInitialized = false
   private lazy val simpleStatsEvolutions = {
-    val schemaManager = new SchemaManager(
-      table.fileIO(),
-      table.location(),
-      CoreOptions.branch(table.schema().options()))
+    val schemaManager = table.schemaManager()
     new SimpleStatsEvolutions(sid => schemaManager.schema(sid).fields(), 
table.schema().id())
   }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 12d0c66622..5ce4d89f12 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -28,7 +28,7 @@ import 
org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.schema.PaimonMetadataColumn._
-import org.apache.paimon.table.{BucketMode, FileStoreTable, KnownSplitsTable}
+import org.apache.paimon.table.{BucketMode, InnerTable, KnownSplitsTable}
 import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.utils.SerializationUtils
@@ -36,7 +36,7 @@ import org.apache.paimon.utils.SerializationUtils
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
 import org.apache.spark.sql.catalyst.plans.logical.{Filter => 
FilterLogicalNode, LogicalPlan, Project}
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -160,9 +160,9 @@ trait PaimonCommand extends WithFileStoreTable with 
ExpressionHelper with SQLCon
       relation: DataSourceV2Relation): DataSourceV2Relation = {
     assert(relation.table.isInstanceOf[SparkTable])
     val sparkTable = relation.table.asInstanceOf[SparkTable]
-    assert(sparkTable.table.isInstanceOf[FileStoreTable])
+    assert(sparkTable.table.isInstanceOf[InnerTable])
     val knownSplitsTable =
-      KnownSplitsTable.create(sparkTable.table.asInstanceOf[FileStoreTable], 
splits.toArray)
+      KnownSplitsTable.create(sparkTable.table.asInstanceOf[InnerTable], 
splits.toArray)
     val outputNames = relation.outputSet.map(_.name)
     def isOutputColumn(colName: String) = {
       val resolve = conf.resolver
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 991f6f9c98..4f3248c0ff 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.schema
 
 import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.table.SpecialFields
 import org.apache.paimon.types.DataField
 
 import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -47,11 +48,14 @@ object PaimonMetadataColumn {
   val FILE_PATH_COLUMN = "__paimon_file_path"
   val PARTITION_COLUMN = "__paimon_partition"
   val BUCKET_COLUMN = "__paimon_bucket"
+
   val SUPPORTED_METADATA_COLUMNS: Seq[String] = Seq(
     ROW_INDEX_COLUMN,
     FILE_PATH_COLUMN,
     PARTITION_COLUMN,
-    BUCKET_COLUMN
+    BUCKET_COLUMN,
+    SpecialFields.ROW_ID.name(),
+    SpecialFields.SEQUENCE_NUMBER.name()
   )
 
   val ROW_INDEX: PaimonMetadataColumn =
@@ -63,6 +67,10 @@ object PaimonMetadataColumn {
   }
   val BUCKET: PaimonMetadataColumn =
     PaimonMetadataColumn(Int.MaxValue - 103, BUCKET_COLUMN, IntegerType)
+  val ROW_ID: PaimonMetadataColumn =
+    PaimonMetadataColumn(Int.MaxValue - 104, SpecialFields.ROW_ID.name(), 
LongType)
+  val SEQUENCE_NUMBER: PaimonMetadataColumn =
+    PaimonMetadataColumn(Int.MaxValue - 105, 
SpecialFields.SEQUENCE_NUMBER.name(), LongType)
 
   def get(metadataColumn: String, partitionType: StructType): 
PaimonMetadataColumn = {
     metadataColumn match {
@@ -70,6 +78,8 @@ object PaimonMetadataColumn {
       case FILE_PATH_COLUMN => FILE_PATH
       case PARTITION_COLUMN => PARTITION(partitionType)
       case BUCKET_COLUMN => BUCKET
+      case ROW_ID.name => ROW_ID
+      case SEQUENCE_NUMBER.name => SEQUENCE_NUMBER
       case _ =>
         throw new IllegalArgumentException(s"$metadataColumn metadata column 
is not supported.")
     }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
new file mode 100644
index 0000000000..e3ec1f4591
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class RowLineageTest extends PaimonSparkTestBase {
+
+  test("Row Lineage: read row lineage") {
+    withTable("t") {
+      sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")
+      sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
+
+      checkAnswer(
+        sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
+        Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1))
+      )
+      checkAnswer(
+        sql("SELECT _ROW_ID, data, _SEQUENCE_NUMBER, id FROM t"),
+        Seq(Row(0, "a", 1, 11), Row(1, "b", 1, 22))
+      )
+    }
+  }
+}

Reply via email to