This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ed8f646bbe [spark] Minnor refactor code in PaimonCommand (#6089)
ed8f646bbe is described below

commit ed8f646bbe70d924cd69e79b923fd5c9fa77cc24
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Aug 19 12:01:56 2025 +0800

    [spark] Minnor refactor code in PaimonCommand (#6089)
---
 .../commands/DeleteFromPaimonTableCommand.scala      | 14 ++++++--------
 .../paimon/spark/commands/MergeIntoPaimonTable.scala | 20 +++++++++-----------
 ...imonCommand.scala => PaimonRowLevelCommand.scala} | 12 +++++++-----
 .../paimon/spark/commands/PaimonSparkWriter.scala    |  4 ++--
 .../spark/commands/UpdatePaimonTableCommand.scala    | 18 ++++++++----------
 .../paimon/spark/commands/WriteIntoPaimonTable.scala |  8 +++-----
 .../org/apache/paimon/spark/sources/PaimonSink.scala |  2 +-
 7 files changed, 36 insertions(+), 42 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index d6a30f9097..6080cd0378 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -19,7 +19,6 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
-import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
 import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.PrimaryKeyTableUtils.validatePKUpsertDeletable
@@ -41,8 +40,7 @@ case class DeleteFromPaimonTableCommand(
     relation: DataSourceV2Relation,
     override val table: FileStoreTable,
     condition: Expression)
-  extends PaimonLeafRunnableCommand
-  with PaimonCommand
+  extends PaimonRowLevelCommand
   with ExpressionHelper
   with SupportsSubquery {
 
@@ -90,7 +88,7 @@ case class DeleteFromPaimonTableCommand(
         if (dropPartitions.nonEmpty) {
           commit.truncatePartitions(dropPartitions.asJava)
         } else {
-          dvSafeWriter.commit(Seq.empty)
+          writer.commit(Seq.empty)
         }
       } else {
         val commitMessages = if (usePKUpsertDelete()) {
@@ -98,7 +96,7 @@ case class DeleteFromPaimonTableCommand(
         } else {
           performNonPrimaryKeyDelete(sparkSession)
         }
-        dvSafeWriter.commit(commitMessages)
+        writer.commit(commitMessages)
       }
     }
 
@@ -117,7 +115,7 @@ case class DeleteFromPaimonTableCommand(
   private def performPrimaryKeyDelete(sparkSession: SparkSession): 
Seq[CommitMessage] = {
     val df = createDataset(sparkSession, Filter(condition, relation))
       .withColumn(ROW_KIND_COL, lit(RowKind.DELETE.toByteValue))
-    dvSafeWriter.write(df)
+    writer.write(df)
   }
 
   private def performNonPrimaryKeyDelete(sparkSession: SparkSession): 
Seq[CommitMessage] = {
@@ -135,7 +133,7 @@ case class DeleteFromPaimonTableCommand(
         sparkSession)
 
       // Step3: update the touched deletion vectors and index files
-      dvSafeWriter.persistDeletionVectors(deletionVectors)
+      writer.persistDeletionVectors(deletionVectors)
     } else {
       // Step2: extract out the exactly files, which must have at least one 
record to be updated.
       val touchedFilePaths =
@@ -153,7 +151,7 @@ case class DeleteFromPaimonTableCommand(
       }
 
       // only write new files, should have no compaction
-      val addCommitMessage = 
dvSafeWriter.writeOnly().withRowLineage().write(data)
+      val addCommitMessage = writer.writeOnly().withRowLineage().write(data)
 
       // Step5: convert the deleted files that need to be written to commit 
message.
       val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 24d94bc56c..3a8a301734 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -20,7 +20,6 @@ package org.apache.paimon.spark.commands
 
 import org.apache.paimon.spark.SparkTable
 import org.apache.paimon.spark.catalyst.analysis.PaimonRelation
-import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.{PaimonMetadataColumn, 
SparkSystemColumns}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn._
 import org.apache.paimon.spark.util.{EncoderUtils, SparkRowUtils}
@@ -51,8 +50,7 @@ case class MergeIntoPaimonTable(
     matchedActions: Seq[MergeAction],
     notMatchedActions: Seq[MergeAction],
     notMatchedBySourceActions: Seq[MergeAction])
-  extends PaimonLeafRunnableCommand
-  with PaimonCommand {
+  extends PaimonRowLevelCommand {
 
   import MergeIntoPaimonTable._
 
@@ -79,12 +77,12 @@ case class MergeIntoPaimonTable(
     } else {
       performMergeForNonPkTable(sparkSession)
     }
-    dvSafeWriter.commit(commitMessages)
+    writer.commit(commitMessages)
     Seq.empty[Row]
   }
 
   private def performMergeForPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
-    dvSafeWriter.write(
+    writer.write(
       constructChangedRows(
         sparkSession,
         createDataset(sparkSession, filteredTargetPlan),
@@ -117,14 +115,14 @@ case class MergeIntoPaimonTable(
         val dvDS = ds.where(
           s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or $ROW_KIND_COL = 
${RowKind.UPDATE_AFTER.toByteValue}")
         val deletionVectors = collectDeletionVectors(dataFilePathToMeta, dvDS, 
sparkSession)
-        val indexCommitMsg = 
dvSafeWriter.persistDeletionVectors(deletionVectors)
+        val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
 
         // Step4: filter rows that should be written as the inserted/updated 
data.
         val toWriteDS = ds
           .where(
             s"$ROW_KIND_COL = ${RowKind.INSERT.toByteValue} or $ROW_KIND_COL = 
${RowKind.UPDATE_AFTER.toByteValue}")
           .drop(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
-        val addCommitMessage = dvSafeWriter.write(toWriteDS)
+        val addCommitMessage = writer.write(toWriteDS)
 
         // Step5: commit index and data commit messages
         addCommitMessage ++ indexCommitMsg
@@ -192,12 +190,12 @@ case class MergeIntoPaimonTable(
         filesToRewrittenDS.union(filesToReadDS),
         writeRowLineage = writeRowLineage).drop(ROW_KIND_COL)
 
-      val writer = if (writeRowLineage) {
-        dvSafeWriter.withRowLineage()
+      val finalWriter = if (writeRowLineage) {
+        writer.withRowLineage()
       } else {
-        dvSafeWriter
+        writer
       }
-      val addCommitMessage = writer.write(toWriteDS)
+      val addCommitMessage = finalWriter.write(toWriteDS)
       val deletedCommitMessage = buildDeletedCommitMessage(filesToRewritten)
 
       addCommitMessage ++ deletedCommitMessage
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
similarity index 96%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
index f628f31f16..7aea754040 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
@@ -25,6 +25,7 @@ import org.apache.paimon.index.IndexFileMeta
 import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement, 
IndexIncrement}
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
+import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.PaimonMetadataColumn._
 import org.apache.paimon.spark.util.ScanPlanHelper
 import org.apache.paimon.table.BucketMode
@@ -46,16 +47,17 @@ import java.util.Collections
 
 import scala.collection.JavaConverters._
 
-/** Helper trait for all paimon commands. */
-trait PaimonCommand
-  extends WithFileStoreTable
+/** Helper trait for paimon row level command, like delete, update, merge 
into. */
+trait PaimonRowLevelCommand
+  extends PaimonLeafRunnableCommand
+  with WithFileStoreTable
   with ExpressionHelper
   with ScanPlanHelper
   with SQLConfHelper {
 
-  lazy val dvSafeWriter: PaimonSparkWriter = {
+  lazy val writer: PaimonSparkWriter = {
     if (table.primaryKeys().isEmpty && table.bucketMode() == 
BucketMode.HASH_FIXED) {
-
+      // todo: fix out why fix bucket non-pk table need write only
       /**
        * Writer without compaction, note that some operations may generate 
Deletion Vectors, and
        * writing may occur at the same time as generating deletion vectors. If 
compaction occurs at
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index be5e1c30d6..a440ad353a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -301,8 +301,8 @@ case class PaimonSparkWriter(table: FileStoreTable, 
writeRowLineage: Boolean = f
 
   /**
    * Write all the deletion vectors to the index files. If it's in unaware 
mode, one index file maps
-   * deletion vectors; else, one index file will contains all deletion vector 
with the same
-   * partition and bucket.
+   * deletion vectors; else, one index file will contain all deletion vector 
with the same partition
+   * and bucket.
    */
   def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVector]): 
Seq[CommitMessage] = {
     val sparkSession = deletionVectors.sparkSession
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 6b47cd74f8..e304480bae 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -19,10 +19,9 @@
 package org.apache.paimon.spark.commands
 
 import org.apache.paimon.spark.catalyst.analysis.AssignmentAlignmentHelper
-import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
 import org.apache.paimon.spark.schema.PaimonMetadataColumn.{ROW_ID_COLUMN, 
SEQUENCE_NUMBER_COLUMN}
 import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
-import org.apache.paimon.table.{FileStoreTable, SpecialFields}
+import org.apache.paimon.table.FileStoreTable
 import org.apache.paimon.table.sink.CommitMessage
 import org.apache.paimon.table.source.DataSplit
 import org.apache.paimon.types.RowKind
@@ -31,7 +30,7 @@ import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.PaimonUtils.createDataset
 import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, If, 
Literal}
 import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, 
TrueLiteral}
-import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, 
LogicalPlan, Project, SupportsSubquery}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.functions.{col, lit}
 
@@ -40,8 +39,7 @@ case class UpdatePaimonTableCommand(
     override val table: FileStoreTable,
     condition: Expression,
     assignments: Seq[Assignment])
-  extends PaimonLeafRunnableCommand
-  with PaimonCommand
+  extends PaimonRowLevelCommand
   with AssignmentAlignmentHelper
   with SupportsSubquery {
 
@@ -58,7 +56,7 @@ case class UpdatePaimonTableCommand(
     } else {
       performUpdateForNonPkTable(sparkSession)
     }
-    dvSafeWriter.commit(commitMessages)
+    writer.commit(commitMessages)
 
     Seq.empty[Row]
   }
@@ -68,7 +66,7 @@ case class UpdatePaimonTableCommand(
     val updatedPlan = Project(updateExpressions, Filter(condition, relation))
     val df = createDataset(sparkSession, updatedPlan)
       .withColumn(ROW_KIND_COL, lit(RowKind.UPDATE_AFTER.toByteValue))
-    dvSafeWriter.write(df)
+    writer.write(df)
   }
 
   /** Update for table without primary keys */
@@ -102,7 +100,7 @@ case class UpdatePaimonTableCommand(
           val addCommitMessage = writeOnlyUpdatedData(sparkSession, 
touchedDataSplits)
 
           // Step4: write these deletion vectors.
-          val indexCommitMsg = 
dvSafeWriter.persistDeletionVectors(deletionVectors)
+          val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
 
           addCommitMessage ++ indexCommitMsg
         } finally {
@@ -138,7 +136,7 @@ case class UpdatePaimonTableCommand(
 
     val toUpdateScanRelation = createNewScanPlan(touchedDataSplits, relation, 
Some(condition))
     val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
-    dvSafeWriter.write(data)
+    writer.write(data)
   }
 
   private def writeUpdatedAndUnchangedData(
@@ -163,7 +161,7 @@ case class UpdatePaimonTableCommand(
     }
 
     val data = createDataset(sparkSession, 
toUpdateScanRelation).select(updateColumns: _*)
-    dvSafeWriter.withRowLineage().write(data)
+    writer.withRowLineage().write(data)
   }
 
   private def optimizedIf(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 5a1beff25e..5a9b36e269 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -21,15 +21,13 @@ package org.apache.paimon.spark.commands
 import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark._
-import org.apache.paimon.spark.schema.SparkSystemColumns
-import org.apache.paimon.spark.util.OptionUtils
+import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, PaimonUtils, Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.functions.{col, lit}
 
 import scala.collection.JavaConverters._
 
@@ -40,7 +38,7 @@ case class WriteIntoPaimonTable(
     _data: DataFrame,
     options: Options)
   extends RunnableCommand
-  with PaimonCommand
+  with ExpressionHelper
   with SchemaHelper
   with Logging {
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
index 54a7a1114c..3387a536ab 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/sources/PaimonSink.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.sources
 
 import org.apache.paimon.options.Options
 import org.apache.paimon.spark.{InsertInto, Overwrite}
-import org.apache.paimon.spark.commands.{PaimonCommand, SchemaHelper, 
WriteIntoPaimonTable}
+import org.apache.paimon.spark.commands.{SchemaHelper, WriteIntoPaimonTable}
 import org.apache.paimon.table.FileStoreTable
 
 import org.apache.spark.sql.{DataFrame, PaimonUtils, SQLContext}

Reply via email to