This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 047856e33e [core][spark] Support reading row lineage metadata column
(#5985)
047856e33e is described below
commit 047856e33e069777e5cbb599b71154da83ffa9eb
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Aug 1 10:11:14 2025 +0800
[core][spark] Support reading row lineage metadata column (#5985)
---
docs/content/spark/sql-query.md | 2 ++
.../apache/paimon/table/source/ReadBuilder.java | 3 +-
.../paimon/table/source/ReadBuilderImpl.java | 6 ----
.../scala/org/apache/paimon/spark/PaimonScan.scala | 4 +--
.../apache/paimon/spark/PaimonScanBuilder.scala | 4 +--
.../scala/org/apache/paimon/spark/PaimonScan.scala | 4 +--
.../paimon/spark/ColumnPruningAndPushDown.scala | 15 ++++++--
.../org/apache/paimon/spark/PaimonBaseScan.scala | 34 +++++++-----------
.../paimon/spark/PaimonBaseScanBuilder.scala | 4 +--
.../paimon/spark/PaimonRecordReaderIterator.scala | 16 +++++----
.../scala/org/apache/paimon/spark/PaimonScan.scala | 4 +--
.../apache/paimon/spark/PaimonScanBuilder.scala | 4 +--
.../org/apache/paimon/spark/PaimonSplitScan.scala | 6 ++--
.../scala/org/apache/paimon/spark/SparkTable.scala | 22 +++++++++---
.../paimon/spark/aggregate/LocalAggregator.scala | 7 +---
.../paimon/spark/commands/PaimonCommand.scala | 8 ++---
.../paimon/spark/schema/PaimonMetadataColumn.scala | 12 ++++++-
.../apache/paimon/spark/sql/RowLineageTest.scala | 42 ++++++++++++++++++++++
18 files changed, 127 insertions(+), 70 deletions(-)
diff --git a/docs/content/spark/sql-query.md b/docs/content/spark/sql-query.md
index 50718ff6ae..62a048bd9a 100644
--- a/docs/content/spark/sql-query.md
+++ b/docs/content/spark/sql-query.md
@@ -43,6 +43,8 @@ Paimon also supports reading some hidden metadata columns,
currently supporting
- `__paimon_partition`: the partition of the record.
- `__paimon_bucket`: the bucket of the record.
- `__paimon_row_index`: the row index of the record.
+- `_ROW_ID`: the unique row id of the record (valid only when
`row-tracking.enabled` is set to true).
+- `_SEQUENCE_NUMBER`: the sequence number of the record (valid only when
`row-tracking.enabled` is set to true).
```sql
-- read all columns and the corresponding file path, partition, bucket,
rowIndex of the record
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 1f3d52b63c..e544182aa7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.Table;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
@@ -119,7 +118,7 @@ public interface ReadBuilder extends Serializable {
/**
* Push read row type to the reader, support nested row pruning.
*
- * @param readType read row type, can be a pruned type from {@link
Table#rowType()}
+ * @param readType read row type
* @since 1.0.0
*/
ReadBuilder withReadType(RowType readType);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 63be2e8765..3f8765f303 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -110,12 +110,6 @@ public class ReadBuilderImpl implements ReadBuilder {
@Override
public ReadBuilder withReadType(RowType readType) {
- RowType tableRowType = table.rowType();
- checkState(
- readType.isPrunedFrom(tableRowType),
- "read row type must be a pruned type from table row type, read
row type: %s, table row type: %s",
- readType,
- tableRowType);
this.readType = readType;
return this;
}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index d3a3c360ee..67d91652b8 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark
import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.Table
+import org.apache.paimon.table.InnerTable
import org.apache.spark.sql.PaimonUtils.fieldReference
import org.apache.spark.sql.connector.expressions.NamedReference
@@ -30,7 +30,7 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
case class PaimonScan(
- table: Table,
+ table: InnerTable,
requiredSchema: StructType,
filters: Seq[Predicate],
reservedFilters: Seq[Filter],
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 395f8707ab..8c2c53163d 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -19,14 +19,14 @@
package org.apache.paimon.spark
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate}
-import org.apache.paimon.table.Table
+import org.apache.paimon.table.InnerTable
import org.apache.spark.sql.connector.read.SupportsPushDownFilters
import org.apache.spark.sql.sources.Filter
import scala.collection.mutable
-class PaimonScanBuilder(table: Table)
+class PaimonScanBuilder(table: InnerTable)
extends PaimonBaseScanBuilder(table)
with SupportsPushDownFilters {
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index ec589442e8..0c356d9b13 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark
import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable}
import org.apache.paimon.table.source.{DataSplit, Split}
import org.apache.spark.sql.PaimonUtils.fieldReference
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
case class PaimonScan(
- table: Table,
+ table: InnerTable,
requiredSchema: StructType,
filters: Seq[Predicate],
reservedFilters: Seq[Filter],
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
index f29c146b77..c64565d16a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
@@ -18,9 +18,10 @@
package org.apache.paimon.spark
+import org.apache.paimon.CoreOptions
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.Table
+import org.apache.paimon.table.{InnerTable, SpecialFields}
import org.apache.paimon.table.source.ReadBuilder
import org.apache.paimon.types.RowType
import org.apache.paimon.utils.Preconditions.checkState
@@ -30,12 +31,20 @@ import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.types.StructType
trait ColumnPruningAndPushDown extends Scan with Logging {
- def table: Table
+ def table: InnerTable
def requiredSchema: StructType
def filters: Seq[Predicate]
def pushDownLimit: Option[Int] = None
- lazy val tableRowType: RowType = table.rowType
+ lazy val tableRowType: RowType = {
+ val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
+ if (coreOptions.rowTrackingEnabled()) {
+ SpecialFields.rowTypeWithRowLineage(table.rowType())
+ } else {
+ table.rowType()
+ }
+ }
+
lazy val tableSchema: StructType =
SparkTypeUtils.fromPaimonRowType(tableRowType)
final def partitionType: StructType = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index b0447c8830..90be4027f8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.predicate.Predicate
import org.apache.paimon.spark.metric.SparkMetricRegistry
import org.apache.paimon.spark.sources.PaimonMicroBatchStream
import org.apache.paimon.spark.statistics.StatisticsHelper
-import org.apache.paimon.table.{DataTable, FileStoreTable, Table}
+import org.apache.paimon.table.{DataTable, InnerTable}
import org.apache.paimon.table.source.{InnerTableScan, Split}
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
@@ -38,7 +38,7 @@ import java.util.Optional
import scala.collection.JavaConverters._
abstract class PaimonBaseScan(
- table: Table,
+ table: InnerTable,
requiredSchema: StructType,
filters: Seq[Predicate],
reservedFilters: Seq[Filter],
@@ -106,30 +106,20 @@ abstract class PaimonBaseScan(
}
override def supportedCustomMetrics: Array[CustomMetric] = {
- val paimonMetrics: Array[CustomMetric] = table match {
- case _: FileStoreTable =>
- Array(
- PaimonNumSplitMetric(),
- PaimonSplitSizeMetric(),
- PaimonAvgSplitSizeMetric(),
- PaimonPlanningDurationMetric(),
- PaimonScannedManifestsMetric(),
- PaimonSkippedTableFilesMetric(),
- PaimonResultedTableFilesMetric()
- )
- case _ =>
- Array.empty[CustomMetric]
- }
+ val paimonMetrics = Array(
+ PaimonNumSplitMetric(),
+ PaimonSplitSizeMetric(),
+ PaimonAvgSplitSizeMetric(),
+ PaimonPlanningDurationMetric(),
+ PaimonScannedManifestsMetric(),
+ PaimonSkippedTableFilesMetric(),
+ PaimonResultedTableFilesMetric()
+ )
super.supportedCustomMetrics() ++ paimonMetrics
}
override def reportDriverMetrics(): Array[CustomTaskMetric] = {
- table match {
- case _: FileStoreTable =>
- paimonMetricsRegistry.buildSparkScanMetrics()
- case _ =>
- Array.empty[CustomTaskMetric]
- }
+ paimonMetricsRegistry.buildSparkScanMetrics()
}
override def description(): String = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
index 1db1784484..fe1e58a756 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -19,14 +19,14 @@
package org.apache.paimon.spark
import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.Table
+import org.apache.paimon.table.InnerTable
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownRequiredColumns}
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
-abstract class PaimonBaseScanBuilder(table: Table)
+abstract class PaimonBaseScanBuilder(table: InnerTable)
extends ScanBuilder
with SupportsPushDownRequiredColumns
with Logging {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
index 4a71cdff88..0c5f08d6e5 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
@@ -35,17 +35,17 @@ case class PaimonRecordReaderIterator(
split: Split)
extends CloseableIterator[PaimonInternalRow] {
+ private val needMetadata = metadataColumns.nonEmpty
+ private val metadataRow: GenericRow =
+ GenericRow.of(Array.fill(metadataColumns.size)(null.asInstanceOf[AnyRef]):
_*)
+ private val joinedRow: JoinedRow = JoinedRow.join(null, metadataRow)
+
private var lastFilePath: Path = _
private var isFileRecordIterator: Boolean = false
private var currentIterator: RecordReader.RecordIterator[PaimonInternalRow]
= readBatch()
private var advanced = false
private var currentResult: PaimonInternalRow = _
- private val needMetadata = metadataColumns.nonEmpty
- private val metadataRow: GenericRow =
- GenericRow.of(Array.fill(metadataColumns.size)(null.asInstanceOf[AnyRef]):
_*)
- private val joinedRow: JoinedRow = JoinedRow.join(null, metadataRow)
-
private def validateMetadataColumns(): Unit = {
if (needMetadata) {
if (!isFileRecordIterator || !split.isInstanceOf[DataSplit]) {
@@ -98,6 +98,11 @@ case class PaimonRecordReaderIterator(
case _ =>
isFileRecordIterator = false
}
+
+ if (iter != null) {
+ validateMetadataColumns()
+ }
+
iter
}
@@ -110,7 +115,6 @@ case class PaimonRecordReaderIterator(
val dataRow = currentIterator.next()
if (dataRow != null) {
if (needMetadata) {
- validateMetadataColumns()
updateMetadataRow(currentIterator.asInstanceOf[FileRecordIterator[PaimonInternalRow]])
currentResult = joinedRow.replace(dataRow, metadataRow)
} else {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index b2c9ae67aa..eef1787d99 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark
import org.apache.paimon.CoreOptions.BucketFunctionType
import org.apache.paimon.predicate.Predicate
import org.apache.paimon.spark.commands.BucketExpression.quote
-import org.apache.paimon.table.{BucketMode, FileStoreTable, Table}
+import org.apache.paimon.table.{BucketMode, FileStoreTable, InnerTable, Table}
import org.apache.paimon.table.source.{DataSplit, Split}
import org.apache.spark.sql.PaimonUtils.fieldReference
@@ -35,7 +35,7 @@ import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
case class PaimonScan(
- table: Table,
+ table: InnerTable,
requiredSchema: StructType,
filters: Seq[Predicate],
reservedFilters: Seq[Filter],
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 5fe1737c0d..10910da298 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark
import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate,
PredicateBuilder}
import org.apache.paimon.spark.aggregate.{AggregatePushDownUtils,
LocalAggregator}
-import org.apache.paimon.table.{FileStoreTable, Table}
+import org.apache.paimon.table.{FileStoreTable, InnerTable, Table}
import org.apache.paimon.table.source.DataSplit
import org.apache.spark.sql.PaimonUtils
@@ -32,7 +32,7 @@ import org.apache.spark.sql.sources.Filter
import scala.collection.JavaConverters._
import scala.collection.mutable
-class PaimonScanBuilder(table: Table)
+class PaimonScanBuilder(table: InnerTable)
extends PaimonBaseScanBuilder(table)
with SupportsPushDownV2Filters
with SupportsPushDownLimit
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index 7d0bf83155..e0fac87518 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
import org.apache.paimon.predicate.Predicate
-import org.apache.paimon.table.{KnownSplitsTable, Table}
+import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
import org.apache.paimon.table.source.{DataSplit, Split}
import org.apache.spark.sql.connector.read.{Batch, Scan}
@@ -34,7 +34,7 @@ class PaimonSplitScanBuilder(table: KnownSplitsTable) extends
PaimonScanBuilder(
/** For internal use only. */
case class PaimonSplitScan(
- table: Table,
+ table: InnerTable,
dataSplits: Array[DataSplit],
requiredSchema: StructType,
filters: Seq[Predicate])
@@ -61,7 +61,7 @@ case class PaimonSplitScan(
}
object PaimonSplitScan {
- def apply(table: Table, dataSplits: Array[DataSplit]): PaimonSplitScan = {
+ def apply(table: InnerTable, dataSplits: Array[DataSplit]): PaimonSplitScan
= {
val requiredSchema = SparkTypeUtils.fromPaimonRowType(table.rowType)
new PaimonSplitScan(table, dataSplits, requiredSchema, Seq.empty)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index a3fac1db79..305a7191d8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -25,11 +25,11 @@ import
org.apache.paimon.spark.catalog.functions.BucketFunction
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
-import org.apache.paimon.table.{BucketMode, DataTable, FileStoreTable,
KnownSplitsTable, Table}
+import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable,
KnownSplitsTable, Table}
import org.apache.paimon.table.BucketMode.{BUCKET_UNAWARE, HASH_FIXED,
POSTPONE_MODE}
import org.apache.paimon.utils.StringUtils
-import org.apache.spark.sql.connector.catalog.{MetadataColumn,
SupportsMetadataColumns, SupportsRead, SupportsWrite, TableCapability,
TableCatalog}
+import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.{Expressions, Transform}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
@@ -39,6 +39,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap
import java.util.{Collections, EnumSet => JEnumSet, HashMap => JHashMap, Map
=> JMap, Set => JSet}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
/** A spark [[org.apache.spark.sql.connector.catalog.Table]] for paimon. */
case class SparkTable(table: Table)
@@ -48,13 +49,14 @@ case class SparkTable(table: Table)
with SupportsMetadataColumns
with PaimonPartitionManagement {
+ lazy val coreOptions = new CoreOptions(table.options())
+
private lazy val useV2Write: Boolean = {
val v2WriteConfigured = OptionUtils.useV2Write()
v2WriteConfigured && supportsV2Write
}
private def supportsV2Write: Boolean = {
- val coreOptions = new CoreOptions(table.options())
coreOptions.bucketFunctionType() == BucketFunctionType.DEFAULT && {
table match {
case storeTable: FileStoreTable =>
@@ -118,20 +120,30 @@ case class SparkTable(table: Table)
override def metadataColumns: Array[MetadataColumn] = {
val partitionType = SparkTypeUtils.toSparkPartitionType(table)
- Array[MetadataColumn](
+
+ val _metadataColumns = ArrayBuffer[MetadataColumn](
PaimonMetadataColumn.FILE_PATH,
PaimonMetadataColumn.ROW_INDEX,
PaimonMetadataColumn.PARTITION(partitionType),
PaimonMetadataColumn.BUCKET
)
+
+ if (coreOptions.rowTrackingEnabled()) {
+ _metadataColumns.append(PaimonMetadataColumn.ROW_ID)
+ _metadataColumns.append(PaimonMetadataColumn.SEQUENCE_NUMBER)
+ }
+
+ _metadataColumns.toArray
}
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder
= {
table match {
case t: KnownSplitsTable =>
new PaimonSplitScanBuilder(t)
+ case _: InnerTable =>
+ new
PaimonScanBuilder(table.copy(options.asCaseSensitiveMap).asInstanceOf[InnerTable])
case _ =>
- new PaimonScanBuilder(table.copy(options.asCaseSensitiveMap))
+ throw new RuntimeException("Only InnerTable can be scanned.")
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
index bb88aa669e..fb401c78e1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/LocalAggregator.scala
@@ -18,9 +18,7 @@
package org.apache.paimon.spark.aggregate
-import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
-import org.apache.paimon.schema.SchemaManager
import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.spark.data.SparkInternalRow
import org.apache.paimon.stats.SimpleStatsEvolutions
@@ -46,10 +44,7 @@ class LocalAggregator(table: FileStoreTable) {
private var aggFuncEvaluatorGetter: () => Seq[AggFuncEvaluator[_]] = _
private var isInitialized = false
private lazy val simpleStatsEvolutions = {
- val schemaManager = new SchemaManager(
- table.fileIO(),
- table.location(),
- CoreOptions.branch(table.schema().options()))
+ val schemaManager = table.schemaManager()
new SimpleStatsEvolutions(sid => schemaManager.schema(sid).fields(),
table.schema().id())
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 12d0c66622..5ce4d89f12 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -28,7 +28,7 @@ import
org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
-import org.apache.paimon.table.{BucketMode, FileStoreTable, KnownSplitsTable}
+import org.apache.paimon.table.{BucketMode, InnerTable, KnownSplitsTable}
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.utils.SerializationUtils
@@ -36,7 +36,7 @@ import org.apache.paimon.utils.SerializationUtils
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.catalyst.SQLConfHelper
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter =>
FilterLogicalNode, LogicalPlan, Project}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -160,9 +160,9 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
relation: DataSourceV2Relation): DataSourceV2Relation = {
assert(relation.table.isInstanceOf[SparkTable])
val sparkTable = relation.table.asInstanceOf[SparkTable]
- assert(sparkTable.table.isInstanceOf[FileStoreTable])
+ assert(sparkTable.table.isInstanceOf[InnerTable])
val knownSplitsTable =
- KnownSplitsTable.create(sparkTable.table.asInstanceOf[FileStoreTable],
splits.toArray)
+ KnownSplitsTable.create(sparkTable.table.asInstanceOf[InnerTable],
splits.toArray)
val outputNames = relation.outputSet.map(_.name)
def isOutputColumn(colName: String) = {
val resolve = conf.resolver
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 991f6f9c98..4f3248c0ff 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.schema
import org.apache.paimon.spark.SparkTypeUtils
+import org.apache.paimon.table.SpecialFields
import org.apache.paimon.types.DataField
import org.apache.spark.sql.catalyst.expressions.AttributeReference
@@ -47,11 +48,14 @@ object PaimonMetadataColumn {
val FILE_PATH_COLUMN = "__paimon_file_path"
val PARTITION_COLUMN = "__paimon_partition"
val BUCKET_COLUMN = "__paimon_bucket"
+
val SUPPORTED_METADATA_COLUMNS: Seq[String] = Seq(
ROW_INDEX_COLUMN,
FILE_PATH_COLUMN,
PARTITION_COLUMN,
- BUCKET_COLUMN
+ BUCKET_COLUMN,
+ SpecialFields.ROW_ID.name(),
+ SpecialFields.SEQUENCE_NUMBER.name()
)
val ROW_INDEX: PaimonMetadataColumn =
@@ -63,6 +67,10 @@ object PaimonMetadataColumn {
}
val BUCKET: PaimonMetadataColumn =
PaimonMetadataColumn(Int.MaxValue - 103, BUCKET_COLUMN, IntegerType)
+ val ROW_ID: PaimonMetadataColumn =
+ PaimonMetadataColumn(Int.MaxValue - 104, SpecialFields.ROW_ID.name(),
LongType)
+ val SEQUENCE_NUMBER: PaimonMetadataColumn =
+ PaimonMetadataColumn(Int.MaxValue - 105,
SpecialFields.SEQUENCE_NUMBER.name(), LongType)
def get(metadataColumn: String, partitionType: StructType):
PaimonMetadataColumn = {
metadataColumn match {
@@ -70,6 +78,8 @@ object PaimonMetadataColumn {
case FILE_PATH_COLUMN => FILE_PATH
case PARTITION_COLUMN => PARTITION(partitionType)
case BUCKET_COLUMN => BUCKET
+ case ROW_ID.name => ROW_ID
+ case SEQUENCE_NUMBER.name => SEQUENCE_NUMBER
case _ =>
throw new IllegalArgumentException(s"$metadataColumn metadata column
is not supported.")
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
new file mode 100644
index 0000000000..e3ec1f4591
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowLineageTest.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import org.apache.spark.sql.Row
+
+class RowLineageTest extends PaimonSparkTestBase {
+
+ test("Row Lineage: read row lineage") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
+
+ checkAnswer(
+ sql("SELECT *, _ROW_ID, _SEQUENCE_NUMBER FROM t"),
+ Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1))
+ )
+ checkAnswer(
+ sql("SELECT _ROW_ID, data, _SEQUENCE_NUMBER, id FROM t"),
+ Seq(Row(0, "a", 1, 11), Row(1, "b", 1, 22))
+ )
+ }
+ }
+}