This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ed8f646bbe [spark] Minnor refactor code in PaimonCommand (#6089)
ed8f646bbe is described below
commit ed8f646bbe70d924cd69e79b923fd5c9fa77cc24
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Aug 19 12:01:56 2025 +0800
[spark] Minnor refactor code in PaimonCommand (#6089)
---
.../commands/DeleteFromPaimonTableCommand.scala | 14 ++++++--------
.../paimon/spark/commands/MergeIntoPaimonTable.scala | 20 +++++++++-----------
...imonCommand.scala => PaimonRowLevelCommand.scala} | 12 +++++++-----
.../paimon/spark/commands/PaimonSparkWriter.scala | 4 ++--
.../spark/commands/UpdatePaimonTableCommand.scala | 18 ++++++++----------
.../paimon/spark/commands/WriteIntoPaimonTable.scala | 8 +++-----
.../org/apache/paimon/spark/sources/PaimonSink.scala | 2 +-
7 files changed, 36 insertions(+), 42 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index d6a30f9097..6080cd0378 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -19,7 +19,6 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
-import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.PrimaryKeyTableUtils.validatePKUpsertDeletable
@@ -41,8 +40,7 @@ case class DeleteFromPaimonTableCommand(
relation: DataSourceV2Relation,
override val table: FileStoreTable,
condition: Expression)
- extends PaimonLeafRunnableCommand
- with PaimonCommand
+ extends PaimonRowLevelCommand
with ExpressionHelper
with SupportsSubquery {
@@ -90,7 +88,7 @@ case class DeleteFromPaimonTableCommand(
if (dropPartitions.nonEmpty) {
commit.truncatePartitions(dropPartitions.asJava)
} else {
- dvSafeWriter.commit(Seq.empty)
+ writer.commit(Seq.empty)
}
} else {
val commitMessages = if (usePKUpsertDelete()) {
@@ -98,7 +96,7 @@ case class DeleteFromPaimonTableCommand(
} else {
performNonPrimaryKeyDelete(sparkSession)
}
- dvSafeWriter.commit(commitMessages)
+ writer.commit(commitMessages)
}
}
@@ -117,7 +115,7 @@ case class DeleteFromPaimonTableCommand(
private def performPrimaryKeyDelete(sparkSession: SparkSession):
Seq[CommitMessage] = {
val df = createDataset(sparkSession, Filter(condition, relation))
.withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
- dvSafeWriter.write(df)
+ writer.write(df)
}
private def performNonPrimaryKeyDelete(sparkSession: SparkSession):
Seq[CommitMessage] = {
@@ -135,7 +133,7 @@ case class DeleteFromPaimonTableCommand(
sparkSession)
// Step3: update the touched deletion vectors and index files
- dvSafeWriter.persistDeletionVectors(deletionVectors)
+ writer.persistDeletionVectors(deletionVectors)
} else {
// Step2: extract out the exactly files, which must have at least one
record to be updated.
val touchedFilePaths =
@@ -153,7 +151,7 @@ case class DeleteFromPaimonTableCommand(
}
// only write new files, should have no compaction
- val addCommitMessage =
dvSafeWriter.writeOnly().withRowLineage().write(data)
+ val addCommitMessage = writer.writeOnly().withRowLineage().write(data)
// Step5: convert the deleted files that need to be written to commit
message.
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 24d94bc56c..3a8a301734 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -20,7 +20,6 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
-import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.{PaimonMetadataColumn,
SparkSystemColumns}
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
@@ -51,8 +50,7 @@ case class MergeIntoPaimonTable(
matchedActions: Seq[MergeAction],
notMatchedActions: Seq[MergeAction],
notMatchedBySourceActions: Seq[MergeAction])
- extends PaimonLeafRunnableCommand
- with PaimonCommand {
+ extends PaimonRowLevelCommand {
import MergeIntoPaimonTable._
@@ -79,12 +77,12 @@ case class MergeIntoPaimonTable(
} else {
performMergeForNonPkTable(sparkSession)
}
- dvSafeWriter.commit(commitMessages)
+ writer.commit(commitMessages)
Seq.empty[Row]
}
private def performMergeForPkTable(sparkSession: SparkSession):
Seq[CommitMessage] = {
- dvSafeWriter.write(
+ writer.write(
constructChangedRows(
sparkSession,
createDataset(sparkSession, filteredTargetPlan),
@@ -117,14 +115,14 @@ case class MergeIntoPaimonTable(
val dvDS = ds.where(
s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or $ROW_KIND_COL =
${RowKind.UPDATE_AFTER.toByteValue}")
val deletionVectors = collectDeletionVectors(dataFilePathToMeta, dvDS,
sparkSession)
- val indexCommitMsg =
dvSafeWriter.persistDeletionVectors(deletionVectors)
+ val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
// Step4: filter rows that should be written as the inserted/updated
data.
val toWriteDS = ds
.where(
s"$ROW_KIND_COL = ${RowKind.INSERT.toByteValue} or $ROW_KIND_COL =
${RowKind.UPDATE_AFTER.toByteValue}")
.drop(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
- val addCommitMessage = dvSafeWriter.write(toWriteDS)
+ val addCommitMessage = writer.write(toWriteDS)
// Step5: commit index and data commit messages
addCommitMessage ++ indexCommitMsg
@@ -192,12 +190,12 @@ case class MergeIntoPaimonTable(
filesToRewrittenDS.union(filesToReadDS),
writeRowLineage = writeRowLineage).drop(ROW_KIND_COL)
- val writer = if (writeRowLineage) {
- dvSafeWriter.withRowLineage()
+ val finalWriter = if (writeRowLineage) {
+ writer.withRowLineage()
} else {
- dvSafeWriter
+ writer
}
- val addCommitMessage = writer.write(toWriteDS)
+ val addCommitMessage = finalWriter.write(toWriteDS)
val deletedCommitMessage = buildDeletedCommitMessage(filesToRewritten)
addCommitMessage ++ deletedCommitMessage
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
similarity index 96%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
index f628f31f16..7aea754040 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
@@ -25,6 +25,7 @@ import org.apache.paimon.index.IndexFileMeta
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
+import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
import org.apache.paimon.spark.util.ScanPlanHelper
import org.apache.paimon.table.BucketMode
@@ -46,16 +47,17 @@ import java.util.Collections
import scala.collection.JavaConverters._
-/** Helper trait for all paimon commands. */
-trait PaimonCommand
- extends WithFileStoreTable
+/** Helper trait for paimon row level command, like delete, update, merge
into. */
+trait PaimonRowLevelCommand
+ extends PaimonLeafRunnableCommand
+ with WithFileStoreTable
with ExpressionHelper
with ScanPlanHelper
with SQLConfHelper {
- lazy val dvSafeWriter: PaimonSparkWriter = {
+ lazy val writer: PaimonSparkWriter = {
if (table.primaryKeys().isEmpty && table.bucketMode() ==
BucketMode.HASH_FIXED) {
-
+ // todo: fix out why fix bucket non-pk table need write only
/**
* Writer without compaction, note that some operations may generate
Deletion Vectors, and
* writing may occur at the same time as generating deletion vectors. If
compaction occurs at
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index be5e1c30d6..a440ad353a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -301,8 +301,8 @@ case class PaimonSparkWriter(table: FileStoreTable,
writeRowLineage: Boolean = f
/**
* Write all the deletion vectors to the index files. If it's in unaware
mode, one index file maps
- * deletion vectors; else, one index file will contains all deletion vector
with the same
- * partition and bucket.
+ * deletion vectors; else, one index file will contain all deletion vector
with the same partition
+ * and bucket.
*/
def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVector]):
Seq[CommitMessage] = {
val sparkSession = deletionVectors.sparkSession
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 6b47cd74f8..e304480bae 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -19,10 +19,9 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
-import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.PaimonMetadataColumn.{ROW_ID_COLUMN,
SEQUENCE_NUMBER_COLUMN}
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
-import org.apache.paimon.table.{FileStoreTable, SpecialFields}
+import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.RowKind
@@ -31,7 +30,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If,
Literal}
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral,
TrueLiteral}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter,
LogicalPlan, Project, SupportsSubquery}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.{col, lit}
@@ -40,8 +39,7 @@ case class UpdatePaimonTableCommand(
override val table: FileStoreTable,
condition: Expression,
assignments: Seq[Assignment])
- extends PaimonLeafRunnableCommand
- with PaimonCommand
+ extends PaimonRowLevelCommand
with AssignmentAlignmentHelper
with SupportsSubquery {
@@ -58,7 +56,7 @@ case class UpdatePaimonTableCommand(
} else {
performUpdateForNonPkTable(sparkSession)
}
- dvSafeWriter.commit(commitMessages)
+ writer.commit(commitMessages)
Seq.empty[Row]
}
@@ -68,7 +66,7 @@ case class UpdatePaimonTableCommand(
val updatedPlan = Project(updateExpressions, Filter(condition, relation))
val df = createDataset(sparkSession, updatedPlan)
.withColumn(ROW_KIND_COL, lit(RowKind.UPDATE_AFTER.toByteValue))
- dvSafeWriter.write(df)
+ writer.write(df)
}
/** Update for table without primary keys */
@@ -102,7 +100,7 @@ case class UpdatePaimonTableCommand(
val addCommitMessage = writeOnlyUpdatedData(sparkSession,
touchedDataSplits)
// Step4: write these deletion vectors.
- val indexCommitMsg =
dvSafeWriter.persistDeletionVectors(deletionVectors)
+ val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
addCommitMessage ++ indexCommitMsg
} finally {
@@ -138,7 +136,7 @@ case class UpdatePaimonTableCommand(
val toUpdateScanRelation = createNewScanPlan(touchedDataSplits, relation,
Some(condition))
val data = createDataset(sparkSession,
toUpdateScanRelation).select(updateColumns: _*)
- dvSafeWriter.write(data)
+ writer.write(data)
}
private def writeUpdatedAndUnchangedData(
@@ -163,7 +161,7 @@ case class UpdatePaimonTableCommand(
}
val data = createDataset(sparkSession,
toUpdateScanRelation).select(updateColumns: _*)
- dvSafeWriter.withRowLineage().write(data)
+ writer.withRowLineage().write(data)
}
private def optimizedIf(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 5a1beff25e..5a9b36e269 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -21,15 +21,13 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
import org.apache.paimon.options.Options
import org.apache.paimon.spark._
-import org.apache.paimon.spark.schema.SparkSystemColumns
-import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.table.FileStoreTable
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, PaimonUtils, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.functions.{col, lit}
import scala.collection.JavaConverters._
@@ -40,7 +38,7 @@ case class WriteIntoPaimonTable(
_data: DataFrame,
options: Options)
extends RunnableCommand
- with PaimonCommand
+ with ExpressionHelper
with SchemaHelper
with Logging {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
index 54a7a1114c..3387a536ab 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.sources
import org.apache.paimon.options.Options
import org.apache.paimon.spark.{InsertInto, Overwrite}
-import org.apache.paimon.spark.commands.{PaimonCommand, SchemaHelper,
WriteIntoPaimonTable}
+import org.apache.paimon.spark.commands.{SchemaHelper, WriteIntoPaimonTable}
import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.{DataFrame, PaimonUtils, SQLContext}