This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d64716936d [spark] Only read necessary meta columns when building new
dataframe (#6084)
d64716936d is described below
commit d64716936d606d49fc8e52ef82bb9831b3cd81d8
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Aug 18 15:18:08 2025 +0800
[spark] Only read necessary meta columns when building new dataframe (#6084)
---
.../paimon/spark/catalyst/Compatibility.scala | 11 +--
.../paimon/spark/catalyst/Compatibility.scala | 11 +--
.../paimon/spark/procedure/CompactProcedure.java | 9 +--
.../org/apache/paimon/spark/PaimonSplitScan.scala | 7 --
.../paimon/spark/catalyst/Compatibility.scala | 11 +--
.../commands/DeleteFromPaimonTableCommand.scala | 2 +-
.../spark/commands/MergeIntoPaimonTable.scala | 46 ++++++-----
.../paimon/spark/commands/PaimonCommand.scala | 92 +++++-----------------
.../spark/commands/UpdatePaimonTableCommand.scala | 26 +++---
.../paimon/spark/schema/PaimonMetadataColumn.scala | 21 +++--
.../apache/paimon/spark/util/ScanPlanHelper.scala | 76 ++++++++++++++++++
.../apache/paimon/spark/PaimonSparkTestBase.scala | 2 +-
.../spark/sql/PaimonOptimizationTestBase.scala | 6 +-
.../paimon/spark/util/ScanPlanHelperTest.scala | 55 +++++++++++++
.../scala/org/apache/spark/sql/paimon/Utils.scala | 9 +--
15 files changed, 213 insertions(+), 171 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b552bcd1e7..02e6efa6c6 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,23 +18,14 @@
package org.apache.paimon.spark.catalyst
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
V2WriteCommand}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
-import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
object Compatibility {
- def createDataSourceV2ScanRelation(
- relation: DataSourceV2Relation,
- scan: Scan,
- output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
- DataSourceV2ScanRelation(relation, scan, output)
- }
-
def withNewQuery(o: V2WriteCommand, query: LogicalPlan): V2WriteCommand = {
o.withNewQuery(query)
}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index b552bcd1e7..02e6efa6c6 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,23 +18,14 @@
package org.apache.paimon.spark.catalyst
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
V2WriteCommand}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
-import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
object Compatibility {
- def createDataSourceV2ScanRelation(
- relation: DataSourceV2Relation,
- scan: Scan,
- output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
- DataSourceV2ScanRelation(relation, scan, output)
- }
-
def withNewQuery(o: V2WriteCommand, query: LogicalPlan): V2WriteCommand = {
o.withNewQuery(query)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 8e299b5200..dd36e2f570 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -29,12 +29,11 @@ import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.spark.PaimonSplitScan;
import org.apache.paimon.spark.SparkUtils;
-import org.apache.paimon.spark.catalyst.Compatibility;
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionUtils;
import org.apache.paimon.spark.commands.PaimonSparkWriter;
import org.apache.paimon.spark.sort.TableSorter;
+import org.apache.paimon.spark.util.ScanPlanHelper$;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.SpecialFields;
@@ -508,10 +507,8 @@ public class CompactProcedure extends BaseProcedure {
Dataset<Row> dataset =
PaimonUtils.createDataset(
spark(),
-
Compatibility.createDataSourceV2ScanRelation(
- relation,
-
PaimonSplitScan.apply(table, split),
-
relation.output()));
+
ScanPlanHelper$.MODULE$.createNewScanPlan(
+ split, relation));
return sorter.sort(dataset);
})
.reduce(Dataset::union)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index e0fac87518..91492de4e4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -59,10 +59,3 @@ case class PaimonSplitScan(
s"PaimonSplitScan: [${table.name}]" + pushedFiltersStr
}
}
-
-object PaimonSplitScan {
- def apply(table: InnerTable, dataSplits: Array[DataSplit]): PaimonSplitScan
= {
- val requiredSchema = SparkTypeUtils.fromPaimonRowType(table.rowType)
- new PaimonSplitScan(table, dataSplits, requiredSchema, Seq.empty)
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
index 5f78cda21b..1fc260fc19 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/Compatibility.scala
@@ -18,23 +18,14 @@
package org.apache.paimon.spark.catalyst
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast,
Expression}
+import org.apache.spark.sql.catalyst.expressions.{Cast, Expression}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
V2WriteCommand}
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
-import org.apache.spark.sql.connector.read.Scan
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
object Compatibility {
- def createDataSourceV2ScanRelation(
- relation: DataSourceV2Relation,
- scan: Scan,
- output: Seq[AttributeReference]): DataSourceV2ScanRelation = {
- DataSourceV2ScanRelation(relation, scan, output)
- }
-
def withNewQuery(o: V2WriteCommand, query: LogicalPlan): V2WriteCommand = {
o.withNewQuery(query)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 55a387dd0f..b6045b5219 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -143,7 +143,7 @@ case class DeleteFromPaimonTableCommand(
// Step3: the smallest range of data files that need to be rewritten.
val (touchedFiles, newRelation) =
- createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
+ extractFilesAndCreateNewScan(touchedFilePaths, dataFilePathToMeta,
relation)
// Step4: build a dataframe that contains the unchanged data, and write
out them.
val toRewriteScanRelation = Filter(Not(condition), newRelation)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index d54639310b..6b170a39b4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -22,7 +22,7 @@ import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.{PaimonMetadataColumn,
SparkSystemColumns}
-import org.apache.paimon.spark.schema.PaimonMetadataColumn.{FILE_PATH,
FILE_PATH_COLUMN, ROW_INDEX, ROW_INDEX_COLUMN}
+import org.apache.paimon.spark.schema.PaimonMetadataColumn._
import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
import org.apache.paimon.table.{FileStoreTable, SpecialFields}
import org.apache.paimon.table.sink.CommitMessage
@@ -102,19 +102,14 @@ case class MergeIntoPaimonTable(
if (deletionVectorsEnabled) {
// Step2: generate dataset that should contains ROW_KIND, FILE_PATH,
ROW_INDEX columns
- val metadataCols = Seq(FILE_PATH, ROW_INDEX)
- val filteredRelation = createDataset(
+ val filteredDf = createDataset(
sparkSession,
- createNewScanPlan(
- candidateDataSplits,
- targetOnlyCondition.getOrElse(TrueLiteral),
- relation,
- metadataCols))
+ createNewScanPlan(candidateDataSplits, relation, targetOnlyCondition))
val ds = constructChangedRows(
sparkSession,
- filteredRelation,
+ selectWithDvMetaCols(filteredDf),
remainDeletedRow = true,
- extraMetadataCols = metadataCols)
+ extraMetadataCols = dvMetaCols)
ds.cache()
try {
@@ -168,24 +163,33 @@ case class MergeIntoPaimonTable(
filePathsToRead --= noMatchedBySourceFilePaths
}
- val (filesToRewritten, touchedFileRelation) =
- createNewRelation(filePathsToRewritten.toArray, dataFilePathToMeta,
relation)
- val (_, unTouchedFileRelation) =
- createNewRelation(filePathsToRead.toArray, dataFilePathToMeta,
relation)
+ val (filesToRewritten, filesToRewrittenScan) =
+ extractFilesAndCreateNewScan(filePathsToRewritten.toArray,
dataFilePathToMeta, relation)
+ val (_, filesToReadScan) =
+ extractFilesAndCreateNewScan(filePathsToRead.toArray,
dataFilePathToMeta, relation)
+
+ // If no files need to be rewritten, no need to write row lineage
+ val writeRowLineage = coreOptions.rowTrackingEnabled() &&
filesToRewritten.nonEmpty
// Add FILE_TOUCHED_COL to mark the row as coming from the touched file,
if the row has not been
// modified and was from touched file, it should be kept too.
- val targetDSWithFileTouchedCol = createDataset(sparkSession,
touchedFileRelation)
- .withColumn(FILE_TOUCHED_COL, lit(true))
- .union(createDataset(sparkSession, unTouchedFileRelation)
- .withColumn(FILE_TOUCHED_COL, lit(false)))
+ var filesToRewrittenDS =
+ createDataset(sparkSession,
filesToRewrittenScan).withColumn(FILE_TOUCHED_COL, lit(true))
+ if (writeRowLineage) {
+ filesToRewrittenDS = selectWithRowLineageMetaCols(filesToRewrittenDS)
+ }
- // If no files need to be rewritten, no need to write row lineage
- val writeRowLineage = coreOptions.rowTrackingEnabled() &&
filesToRewritten.nonEmpty
+ var filesToReadDS =
+ createDataset(sparkSession,
filesToReadScan).withColumn(FILE_TOUCHED_COL, lit(false))
+ if (writeRowLineage) {
+ // For filesToReadScan we don't need to read row lineage meta cols,
just add placeholders
+ ROW_LINEAGE_META_COLUMNS.foreach(
+ c => filesToReadDS = filesToReadDS.withColumn(c, lit(null)))
+ }
val toWriteDS = constructChangedRows(
sparkSession,
- targetDSWithFileTouchedCol,
+ filesToRewrittenDS.union(filesToReadDS),
writeRowLineage = writeRowLineage).drop(ROW_KIND_COL)
val writer = if (writeRowLineage) {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 4fe0678951..f628f31f16 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -23,12 +23,11 @@ import
org.apache.paimon.deletionvectors.{Bitmap64DeletionVector, BitmapDeletion
import org.apache.paimon.fs.Path
import org.apache.paimon.index.IndexFileMeta
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
-import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
-import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
-import org.apache.paimon.table.{BucketMode, InnerTable, KnownSplitsTable}
+import org.apache.paimon.spark.util.ScanPlanHelper
+import org.apache.paimon.table.BucketMode
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.utils.SerializationUtils
@@ -38,8 +37,9 @@ import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{Filter =>
FilterLogicalNode, LogicalPlan, Project}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions.col
import java.net.URI
import java.util.Collections
@@ -47,7 +47,11 @@ import java.util.Collections
import scala.collection.JavaConverters._
/** Helper trait for all paimon commands. */
-trait PaimonCommand extends WithFileStoreTable with ExpressionHelper with
SQLConfHelper {
+trait PaimonCommand
+ extends WithFileStoreTable
+ with ExpressionHelper
+ with ScanPlanHelper
+ with SQLConfHelper {
lazy val dvSafeWriter: PaimonSparkWriter = {
if (table.primaryKeys().isEmpty && table.bucketMode() ==
BucketMode.HASH_FIXED) {
@@ -102,7 +106,7 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
}
}
- val filteredRelation = createNewScanPlan(candidateDataSplits, condition,
relation)
+ val filteredRelation = createNewScanPlan(candidateDataSplits, relation,
Some(condition))
findTouchedFiles(createDataset(sparkSession, filteredRelation),
sparkSession)
}
@@ -119,65 +123,18 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
.map(relativePath)
}
- protected def createNewScanPlan(
- candidateDataSplits: Seq[DataSplit],
- condition: Expression,
- relation: DataSourceV2Relation,
- metadataColumns: Seq[PaimonMetadataColumn]): LogicalPlan = {
- val newRelation = createNewScanPlan(candidateDataSplits, condition,
relation)
- val resolvedMetadataColumns = metadataColumns.map {
- col =>
- val attr = newRelation.resolve(col.name :: Nil, conf.resolver)
- assert(attr.isDefined)
- attr.get
- }
- Project(relation.output ++ resolvedMetadataColumns, newRelation)
- }
-
- protected def createNewScanPlan(
- candidateDataSplits: Seq[DataSplit],
- condition: Expression,
- relation: DataSourceV2Relation): LogicalPlan = {
- val newRelation = createNewRelation(candidateDataSplits, relation)
- FilterLogicalNode(condition, newRelation)
- }
-
- protected def createNewRelation(
+ protected def extractFilesAndCreateNewScan(
filePaths: Array[String],
filePathToMeta: Map[String, SparkDataFileMeta],
- relation: DataSourceV2Relation): (Array[SparkDataFileMeta],
DataSourceV2Relation) = {
+ relation: DataSourceV2Relation): (Array[SparkDataFileMeta], LogicalPlan)
= {
val files = filePaths.map(
file => filePathToMeta.getOrElse(file, throw new
RuntimeException(s"Missing file: $file")))
val touchedDataSplits =
SparkDataFileMeta.convertToDataSplits(files, rawConvertible = true,
fileStore.pathFactory())
- val newRelation = createNewRelation(touchedDataSplits, relation)
+ val newRelation = createNewScanPlan(touchedDataSplits, relation)
(files, newRelation)
}
- protected def createNewRelation(
- splits: Seq[DataSplit],
- relation: DataSourceV2Relation): DataSourceV2Relation = {
- assert(relation.table.isInstanceOf[SparkTable])
- val sparkTable = relation.table.asInstanceOf[SparkTable]
- assert(sparkTable.table.isInstanceOf[InnerTable])
- val knownSplitsTable =
- KnownSplitsTable.create(sparkTable.table.asInstanceOf[InnerTable],
splits.toArray)
- val outputNames = relation.outputSet.map(_.name)
- def isOutputColumn(colName: String) = {
- val resolve = conf.resolver
- outputNames.exists(resolve(colName, _))
- }
- val appendMetaColumns = sparkTable.metadataColumns
- .map(_.asInstanceOf[PaimonMetadataColumn].toAttribute)
- .filter(col => !isOutputColumn(col.name))
- // We re-plan the relation to skip analyze phase, so we should append
needed
- // metadata columns manually and let Spark do column pruning during
optimization.
- relation.copy(
- table = relation.table.asInstanceOf[SparkTable].copy(table =
knownSplitsTable),
- output = relation.output ++ appendMetaColumns
- )
- }
-
/** Notice that, the key is a relative path, not just the file name. */
protected def candidateFileMap(
candidateDataSplits: Seq[DataSplit]): Map[String, SparkDataFileMeta] = {
@@ -196,34 +153,25 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper with SQLCon
condition: Expression,
relation: DataSourceV2Relation,
sparkSession: SparkSession): Dataset[SparkDeletionVector] = {
- val filteredRelation = createNewScanPlan(candidateDataSplits, condition,
relation)
- val dataWithMetadataColumns = createDataset(sparkSession, filteredRelation)
- collectDeletionVectors(dataFilePathToMeta, dataWithMetadataColumns,
sparkSession)
+ val filteredRelation =
+ createNewScanPlan(candidateDataSplits, relation, Some(condition))
+ val dataset = createDataset(sparkSession, filteredRelation)
+ collectDeletionVectors(dataFilePathToMeta, dataset, sparkSession)
}
protected def collectDeletionVectors(
dataFilePathToMeta: Map[String, SparkDataFileMeta],
- dataWithMetadataColumns: Dataset[Row],
+ dataset: Dataset[Row],
sparkSession: SparkSession): Dataset[SparkDeletionVector] = {
import sparkSession.implicits._
-
- val resolver = sparkSession.sessionState.conf.resolver
- Seq(FILE_PATH_COLUMN, ROW_INDEX_COLUMN).foreach {
- metadata =>
- dataWithMetadataColumns.schema
- .find(field => resolver(field.name, metadata))
- .orElse(throw new RuntimeException(
- "This input dataset doesn't contains the required metadata
columns: __paimon_file_path and __paimon_row_index."))
- }
-
val dataFileToPartitionAndBucket =
dataFilePathToMeta.mapValues(meta => (meta.partition,
meta.bucket)).toArray
val my_table = table
val location = my_table.location
val dvBitmap64 = my_table.coreOptions().deletionVectorBitmap64()
- dataWithMetadataColumns
- .select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
+ dataset
+ .select(DV_META_COLUMNS.map(col): _*)
.as[(String, Long)]
.groupByKey(_._1)
.mapGroups {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index e7d43aa936..6b47cd74f8 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{ROW_ID_COLUMN,
SEQUENCE_NUMBER_COLUMN}
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.{FileStoreTable, SpecialFields}
import org.apache.paimon.table.sink.CommitMessage
@@ -30,7 +31,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If,
Literal}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
Project, SupportsSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
LogicalPlan, Project, SupportsSubquery}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.{col, lit}
@@ -77,7 +78,7 @@ case class UpdatePaimonTableCommand(
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
if (candidateDataSplits.isEmpty) {
- // no data spilt need to be rewrote
+ // no data spilt need to be rewritten
logDebug("No file need to rewrote. It's an empty Commit.")
Seq.empty[CommitMessage]
} else {
@@ -114,12 +115,12 @@ case class UpdatePaimonTableCommand(
// Step3: the smallest range of data files that need to be rewritten.
val (touchedFiles, touchedFileRelation) =
- createNewRelation(touchedFilePaths, dataFilePathToMeta, relation)
+ extractFilesAndCreateNewScan(touchedFilePaths, dataFilePathToMeta,
relation)
// Step4: build a dataframe that contains the unchanged and updated
data, and write out them.
val addCommitMessage = writeUpdatedAndUnchangedData(sparkSession,
touchedFileRelation)
- // Step5: convert the deleted files that need to be wrote to commit
message.
+ // Step5: convert the deleted files that need to be written to commit
message.
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
addCommitMessage ++ deletedCommitMessage
@@ -135,19 +136,14 @@ case class UpdatePaimonTableCommand(
toColumn(update).as(origin.name, origin.metadata)
}
- val toUpdateScanRelation = createNewRelation(touchedDataSplits, relation)
- val newPlan = if (condition == TrueLiteral) {
- toUpdateScanRelation
- } else {
- Filter(condition, toUpdateScanRelation)
- }
- val data = createDataset(sparkSession, newPlan).select(updateColumns: _*)
+ val toUpdateScanRelation = createNewScanPlan(touchedDataSplits, relation,
Some(condition))
+ val data = createDataset(sparkSession,
toUpdateScanRelation).select(updateColumns: _*)
dvSafeWriter.write(data)
}
private def writeUpdatedAndUnchangedData(
sparkSession: SparkSession,
- toUpdateScanRelation: DataSourceV2Relation): Seq[CommitMessage] = {
+ toUpdateScanRelation: LogicalPlan): Seq[CommitMessage] = {
var updateColumns = updateExpressions.zip(relation.output).map {
case (update, origin) =>
val updated = optimizedIf(condition, update, origin)
@@ -156,13 +152,13 @@ case class UpdatePaimonTableCommand(
if (coreOptions.rowTrackingEnabled()) {
updateColumns ++= Seq(
- col(SpecialFields.ROW_ID.name()),
+ col(ROW_ID_COLUMN),
toColumn(
optimizedIf(
condition,
Literal(null),
- toExpression(sparkSession,
col(SpecialFields.SEQUENCE_NUMBER.name()))))
- .as(SpecialFields.SEQUENCE_NUMBER.name())
+ toExpression(sparkSession, col(SEQUENCE_NUMBER_COLUMN))))
+ .as(SEQUENCE_NUMBER_COLUMN)
)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 4f3248c0ff..4bf6b6c743 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -48,14 +48,19 @@ object PaimonMetadataColumn {
val FILE_PATH_COLUMN = "__paimon_file_path"
val PARTITION_COLUMN = "__paimon_partition"
val BUCKET_COLUMN = "__paimon_bucket"
+ val ROW_ID_COLUMN = SpecialFields.ROW_ID.name()
+ val SEQUENCE_NUMBER_COLUMN = SpecialFields.SEQUENCE_NUMBER.name()
+
+ val DV_META_COLUMNS: Seq[String] = Seq(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
+ val ROW_LINEAGE_META_COLUMNS: Seq[String] = Seq(ROW_ID_COLUMN,
SEQUENCE_NUMBER_COLUMN)
val SUPPORTED_METADATA_COLUMNS: Seq[String] = Seq(
ROW_INDEX_COLUMN,
FILE_PATH_COLUMN,
PARTITION_COLUMN,
BUCKET_COLUMN,
- SpecialFields.ROW_ID.name(),
- SpecialFields.SEQUENCE_NUMBER.name()
+ ROW_ID_COLUMN,
+ SEQUENCE_NUMBER_COLUMN
)
val ROW_INDEX: PaimonMetadataColumn =
@@ -68,9 +73,13 @@ object PaimonMetadataColumn {
val BUCKET: PaimonMetadataColumn =
PaimonMetadataColumn(Int.MaxValue - 103, BUCKET_COLUMN, IntegerType)
val ROW_ID: PaimonMetadataColumn =
- PaimonMetadataColumn(Int.MaxValue - 104, SpecialFields.ROW_ID.name(),
LongType)
+ PaimonMetadataColumn(Int.MaxValue - 104, ROW_ID_COLUMN, LongType)
val SEQUENCE_NUMBER: PaimonMetadataColumn =
- PaimonMetadataColumn(Int.MaxValue - 105,
SpecialFields.SEQUENCE_NUMBER.name(), LongType)
+ PaimonMetadataColumn(Int.MaxValue - 105, SEQUENCE_NUMBER_COLUMN, LongType)
+
+ def dvMetaCols: Seq[PaimonMetadataColumn] = Seq(FILE_PATH, ROW_INDEX)
+
+ def rowLineageMetaCols: Seq[PaimonMetadataColumn] = Seq(ROW_ID,
SEQUENCE_NUMBER)
def get(metadataColumn: String, partitionType: StructType):
PaimonMetadataColumn = {
metadataColumn match {
@@ -78,8 +87,8 @@ object PaimonMetadataColumn {
case FILE_PATH_COLUMN => FILE_PATH
case PARTITION_COLUMN => PARTITION(partitionType)
case BUCKET_COLUMN => BUCKET
- case ROW_ID.name => ROW_ID
- case SEQUENCE_NUMBER.name => SEQUENCE_NUMBER
+ case ROW_ID_COLUMN => ROW_ID
+ case SEQUENCE_NUMBER_COLUMN => SEQUENCE_NUMBER
case _ =>
throw new IllegalArgumentException(s"$metadataColumn metadata column
is not supported.")
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
new file mode 100644
index 0000000000..437a563ca8
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/ScanPlanHelper.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.paimon.spark.SparkTable
+import org.apache.paimon.spark.schema.PaimonMetadataColumn.{DV_META_COLUMNS,
ROW_LINEAGE_META_COLUMNS}
+import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
+import org.apache.paimon.table.source.DataSplit
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.functions.col
+
+trait ScanPlanHelper extends SQLConfHelper {
+
+ /** Create a new scan plan from a relation with the given data splits,
condition(optional). */
+ def createNewScanPlan(
+ dataSplits: Seq[DataSplit],
+ relation: DataSourceV2Relation,
+ condition: Option[Expression] = None): LogicalPlan = {
+ val newRelation = relation.table match {
+ case sparkTable @ SparkTable(table: InnerTable) =>
+ val knownSplitsTable = KnownSplitsTable.create(table,
dataSplits.toArray)
+ relation.copy(table = sparkTable.copy(table = knownSplitsTable))
+ case _ => throw new RuntimeException()
+ }
+
+ condition match {
+ case Some(c) if c != TrueLiteral => Filter(c, newRelation)
+ case _ => newRelation
+ }
+ }
+
+ def selectWithDvMetaCols(data: DataFrame): DataFrame = {
+ selectWithAdditionalCols(data, DV_META_COLUMNS)
+ }
+
+ def selectWithRowLineageMetaCols(data: DataFrame): DataFrame = {
+ selectWithAdditionalCols(data, ROW_LINEAGE_META_COLUMNS)
+ }
+
+ private def selectWithAdditionalCols(data: DataFrame, additionalCols:
Seq[String]): DataFrame = {
+ val dataColNames = data.schema.names
+ val mergedColNames = dataColNames ++
additionalCols.filterNot(dataColNames.contains)
+ data.select(mergedColNames.map(col): _*)
+ }
+}
+
+/** This wrapper is only used in java code, e.g. Procedure. */
+object ScanPlanHelper extends ScanPlanHelper {
+ def createNewScanPlan(
+ dataSplits: Array[DataSplit],
+ relation: DataSourceV2Relation): LogicalPlan = {
+ ScanPlanHelper.createNewScanPlan(dataSplits.toSeq, relation)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
index f3c2121860..013395d7c2 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/PaimonSparkTestBase.scala
@@ -175,7 +175,7 @@ class PaimonSparkTestBase
paimonCatalog.getTable(Identifier.create(dbName,
tableName)).asInstanceOf[FileStoreTable]
}
- protected def createRelationV2(tableName: String): LogicalPlan = {
+ protected def createRelationV2(tableName: String): DataSourceV2Relation = {
val sparkTable = SparkTable(loadTable(tableName))
DataSourceV2Relation.create(
sparkTable,
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
index e5f1d0e131..925344d49d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonOptimizationTestBase.scala
@@ -23,13 +23,11 @@ import org.apache.paimon.spark.PaimonSparkTestBase
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.catalyst.optimizer.MergePaimonScalarSubqueries
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{PaimonUtils, Row}
import org.apache.spark.sql.catalyst.expressions.{Attribute,
CreateNamedStruct, Literal, NamedExpression}
import org.apache.spark.sql.catalyst.plans.logical.{CTERelationDef,
LogicalPlan, OneRowRelation, WithCTE}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.functions._
-import org.apache.spark.sql.paimon.Utils
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
import org.junit.jupiter.api.Assertions
import scala.collection.immutable
@@ -62,7 +60,7 @@ abstract class PaimonOptimizationTestBase extends
PaimonSparkTestBase with Expre
|""".stripMargin)
val optimizedPlan = Optimize.execute(query.queryExecution.analyzed)
- val df = Utils.createDataFrame(spark, createRelationV2("T"))
+ val df = PaimonUtils.createDataset(spark, createRelationV2("T"))
val mergedSubquery = df
.select(
toColumn(count(Literal(1))).as("cnt"),
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
new file mode 100644
index 0000000000..007b95e10b
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.source.DataSplit
+
+import org.apache.spark.sql.PaimonUtils.createDataset
+import org.apache.spark.sql.Row
+
+class ScanPlanHelperTest extends PaimonSparkTestBase with ScanPlanHelper {
+
+ test("ScanPlanHelper: create new scan plan and select with row lineage meta
cols") {
+ withTable("t") {
+ sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
+ sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
+
+ val splits = getPaimonScan("SELECT * FROM
t").getOriginSplits.map(_.asInstanceOf[DataSplit])
+ val newScanPlan = createNewScanPlan(splits, createRelationV2("t"))
+ val newDf = createDataset(spark, newScanPlan)
+
+ // select original df should not contain meta cols
+ checkAnswer(newDf, Seq(Row(11, "a"), Row(22, "b")))
+
+ // select df with row lineage meta cols
+ checkAnswer(selectWithRowLineageMetaCols(newDf), Seq(Row(11, "a", 0, 1),
Row(22, "b", 1, 1)))
+
+ // select with row lineage meta cols twice should not add new more meta
cols
+ checkAnswer(
+ selectWithRowLineageMetaCols(selectWithRowLineageMetaCols(newDf)),
+ Seq(Row(11, "a", 0, 1), Row(22, "b", 1, 1)))
+
+ // select df already contains meta cols with row lineage
+ checkAnswer(
+ selectWithRowLineageMetaCols(newDf.select("_ROW_ID", "id")),
+ Seq(Row(0, 11, 1), Row(1, 22, 1)))
+ }
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
index 61a479b9f0..b5700ea8ef 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/spark/sql/paimon/Utils.scala
@@ -18,9 +18,7 @@
package org.apache.spark.sql.paimon
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.paimon.shims.SparkShimLoader
+import org.apache.spark.sql.SparkSession
import org.apache.spark.util.{Utils => SparkUtils}
import java.io.File
@@ -35,9 +33,4 @@ object Utils {
def waitUntilEventEmpty(spark: SparkSession): Unit = {
spark.sparkContext.listenerBus.waitUntilEmpty()
}
-
- def createDataFrame(sparkSession: SparkSession, plan: LogicalPlan):
DataFrame = {
- SparkShimLoader.shim.classicApi.createDataset(sparkSession, plan)
- }
-
}