This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 74c7f68c7d [spark] Minor refactor for spark v2 writer (#5564)
74c7f68c7d is described below

commit 74c7f68c7db1776b80c96f1d819131f851aa9b98
Author: Zouxxyy <[email protected]>
AuthorDate: Tue May 6 10:25:46 2025 +0800

    [spark] Minor refactor for spark v2 writer (#5564)
---
 .../generated/spark_connector_configuration.html   |  6 ++++
 .../analysis/expressions/ExpressionHelper.scala    | 37 ----------------------
 .../scala/org/apache/paimon/spark/SparkTable.scala | 14 ++++----
 .../analysis/expressions/ExpressionHelper.scala    | 14 ++++----
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  3 +-
 .../{SparkV2Write.scala => PaimonV2Write.scala}    | 24 +++++++++-----
 ...iteBuilder.scala => PaimonV2WriteBuilder.scala} | 14 ++++----
 .../write/{SparkWrite.scala => PaimonWrite.scala}  |  2 +-
 ...WriteBuilder.scala => PaimonWriteBuilder.scala} |  4 +--
 ...uirement.scala => PaimonWriteRequirement.scala} | 30 +++++++++++-------
 10 files changed, 64 insertions(+), 84 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 7074cbc05b..06f5093477 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -80,5 +80,11 @@ under the License.
             <td>Boolean</td>
             <td>If true, allow to merge data types if the two types meet the 
rules for explicit casting.</td>
         </tr>
+        <tr>
+            <td><h5>write.use-v2-write</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>If true, v2 write will be used. Currently, only HASH_FIXED and 
BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other 
bucket modes. Currently, Spark V2 write does not support 
TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.</td>
+        </tr>
     </tbody>
 </table>
diff --git 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index f4890f84bd..56223c36cd 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -20,12 +20,10 @@ package 
org.apache.paimon.spark.catalyst.analysis.expressions
 
 import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
 import org.apache.paimon.spark.SparkFilterConverter
-import org.apache.paimon.spark.write.SparkWriteBuilder
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.PaimonUtils.{normalizeExprs, translateFilter}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.sources.{AlwaysTrue, And => SourceAnd, 
EqualNullSafe, EqualTo, Filter => SourceFilter}
 
 trait ExpressionHelper extends ExpressionHelperBase {
 
@@ -54,39 +52,4 @@ trait ExpressionHelper extends ExpressionHelperBase {
       Some(PredicateBuilder.and(predicates: _*))
     }
   }
-
-  /**
-   * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call 
the `truncate`
-   * methods where the `AlwaysTrue` Filter is used.
-   */
-  def isTruncate(filter: SourceFilter): Boolean = {
-    val filters = splitConjunctiveFilters(filter)
-    filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
-  }
-
-  /** See [[ SparkWriteBuilder#failIfCanNotOverwrite]] */
-  def convertPartitionFilterToMap(
-      filter: SourceFilter,
-      partitionRowType: RowType): Map[String, String] = {
-    // todo: replace it with SparkV2FilterConverter when we drop Spark3.2
-    val converter = new SparkFilterConverter(partitionRowType)
-    splitConjunctiveFilters(filter).map {
-      case EqualNullSafe(attribute, value) =>
-        (attribute, converter.convertString(attribute, value))
-      case EqualTo(attribute, value) =>
-        (attribute, converter.convertString(attribute, value))
-      case _ =>
-        // Should not happen
-        throw new RuntimeException(
-          s"Only support Overwrite filters with Equal and EqualNullSafe, but 
got: $filter")
-    }.toMap
-  }
-
-  private def splitConjunctiveFilters(filter: SourceFilter): Seq[SourceFilter] 
= {
-    filter match {
-      case SourceAnd(filter1, filter2) =>
-        splitConjunctiveFilters(filter1) ++ splitConjunctiveFilters(filter2)
-      case other => other :: Nil
-    }
-  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index bdc0d2cc29..03f1ff7af4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.options.Options
 import org.apache.paimon.spark.catalog.functions.BucketFunction
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.util.OptionUtils
-import org.apache.paimon.spark.write.{SparkV2WriteBuilder, SparkWriteBuilder}
+import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
 import org.apache.paimon.table.{BucketMode, DataTable, FileStoreTable, 
KnownSplitsTable, Table}
 import org.apache.paimon.utils.StringUtils
 
@@ -47,7 +47,7 @@ case class SparkTable(table: Table)
   with PaimonPartitionManagement {
 
   private lazy val useV2Write: Boolean = {
-    val v2WriteConfigured = OptionUtils.useV2Write
+    val v2WriteConfigured = OptionUtils.useV2Write()
     v2WriteConfigured && supportsV2Write
   }
 
@@ -55,10 +55,8 @@ case class SparkTable(table: Table)
     table match {
       case storeTable: FileStoreTable =>
         storeTable.bucketMode() match {
-          case BucketMode.HASH_FIXED =>
-            storeTable.coreOptions().bucket() > 0 && 
BucketFunction.supportsTable(storeTable)
-          case BucketMode.BUCKET_UNAWARE =>
-            storeTable.coreOptions().bucket() == BucketMode.UNAWARE_BUCKET
+          case BucketMode.HASH_FIXED => 
BucketFunction.supportsTable(storeTable)
+          case BucketMode.BUCKET_UNAWARE => true
           case _ => false
         }
 
@@ -137,9 +135,9 @@ case class SparkTable(table: Table)
       case fileStoreTable: FileStoreTable =>
         val options = Options.fromMap(info.options)
         if (useV2Write) {
-          new SparkV2WriteBuilder(fileStoreTable, info.schema())
+          new PaimonV2WriteBuilder(fileStoreTable, info.schema())
         } else {
-          new SparkWriteBuilder(fileStoreTable, options)
+          new PaimonWriteBuilder(fileStoreTable, options)
         }
       case _ =>
         throw new RuntimeException("Only FileStoreTable can be written.")
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 2cb739ee6d..682cf88fcf 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalyst.analysis.expressions
 import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
 import org.apache.paimon.spark.{SparkFilterConverter, SparkV2FilterConverter}
 import org.apache.paimon.spark.catalyst.Compatibility
-import org.apache.paimon.spark.write.SparkWriteBuilder
+import org.apache.paimon.spark.write.PaimonWriteBuilder
 import org.apache.paimon.types.RowType
 
 import org.apache.spark.sql.{Column, SparkSession}
@@ -66,6 +66,11 @@ trait ExpressionHelper extends ExpressionHelperBase {
       Some(PredicateBuilder.and(predicates: _*))
     }
   }
+}
+
+trait ExpressionHelperBase extends PredicateHelper {
+
+  import ExpressionHelper._
 
   /**
    * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call 
the `truncate`
@@ -76,7 +81,7 @@ trait ExpressionHelper extends ExpressionHelperBase {
     filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
   }
 
-  /** See [[ SparkWriteBuilder#failIfCanNotOverwrite]] */
+  /** See [[ PaimonWriteBuilder#failIfCanNotOverwrite]] */
   def convertPartitionFilterToMap(
       filter: SourceFilter,
       partitionRowType: RowType): Map[String, String] = {
@@ -101,11 +106,6 @@ trait ExpressionHelper extends ExpressionHelperBase {
       case other => other :: Nil
     }
   }
-}
-
-trait ExpressionHelperBase extends PredicateHelper {
-
-  import ExpressionHelper._
 
   def toColumn(expr: Expression): Column = {
     SparkShimLoader.getSparkShim.column(expr)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 831a7966f3..5ff7b7a0e2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -25,7 +25,6 @@ import org.apache.paimon.crosspartition.{IndexBootstrap, 
KeyPartOrRow}
 import org.apache.paimon.data.serializer.InternalSerializers
 import org.apache.paimon.deletionvectors.DeletionVector
 import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer
-import org.apache.paimon.fs.Path
 import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
 import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
 import org.apache.paimon.manifest.FileKind
@@ -242,7 +241,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
         writeWithoutBucket(data)
 
       case HASH_FIXED =>
-        if (table.bucketSpec().getNumBuckets == -2) {
+        if (table.bucketSpec().getNumBuckets == POSTPONE_BUCKET) {
           writeWithoutBucket(data)
         } else if (paimonExtensionEnabled && 
BucketFunction.supportsTable(table)) {
           // Topology: input -> shuffle by partition & bucket
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
similarity index 91%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 3f2018060e..a97cccfbbc 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -35,7 +35,7 @@ import java.io.{IOException, UncheckedIOException}
 import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Success, Try}
 
-class SparkV2Write(
+class PaimonV2Write(
     storeTable: FileStoreTable,
     overwriteDynamic: Boolean,
     overwritePartitions: Option[Map[String, String]],
@@ -58,7 +58,7 @@ class SparkV2Write(
     builder
   }
 
-  private val writeRequirement = SparkWriteRequirement(table)
+  private val writeRequirement = PaimonWriteRequirement(table)
 
   override def requiredDistribution(): Distribution = {
     val distribution = writeRequirement.distribution
@@ -74,11 +74,19 @@ class SparkV2Write(
 
   override def toBatch: BatchWrite = new PaimonBatchWrite
 
-  override def toString: String =
-    if (overwriteDynamic)
-      s"PaimonWrite(table=${table.fullName()}, overwriteDynamic=true)"
-    else
-      s"PaimonWrite(table=${table.fullName()}, 
overwritePartitions=$overwritePartitions)"
+  override def toString: String = {
+    val overwriteDynamicStr = if (overwriteDynamic) {
+      ", overwriteDynamic=true"
+    } else {
+      ""
+    }
+    val overwritePartitionsStr = overwritePartitions match {
+      case Some(partitions) if partitions.nonEmpty => s", 
overwritePartitions=$partitions"
+      case Some(_) => ", overwriteTable=true"
+      case None => ""
+    }
+    
s"PaimonWrite(table=${table.fullName()}$overwriteDynamicStr$overwritePartitionsStr)"
+  }
 
   private class PaimonBatchWrite extends BatchWrite {
     override def createBatchWriterFactory(info: PhysicalWriteInfo): 
DataWriterFactory =
@@ -91,7 +99,7 @@ class SparkV2Write(
       val batchTableCommit = batchWriteBuilder.newCommit()
 
       val commitMessages = messages.collect {
-        case taskCommit: TaskCommit => taskCommit.commitMessages
+        case taskCommit: TaskCommit => taskCommit.commitMessages()
         case other =>
           throw new IllegalArgumentException(s"${other.getClass.getName} is 
not supported")
       }.flatten
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
similarity index 82%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
index a8b9a1c211..462fd3311f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
@@ -24,16 +24,16 @@ import 
org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, SupportsO
 import org.apache.spark.sql.sources.{And, Filter}
 import org.apache.spark.sql.types.StructType
 
-class SparkV2WriteBuilder(table: FileStoreTable, writeSchema: StructType)
+class PaimonV2WriteBuilder(table: FileStoreTable, writeSchema: StructType)
   extends BaseWriteBuilder(table)
   with SupportsOverwrite
   with SupportsDynamicOverwrite {
 
   private var overwriteDynamic = false
-  private var overwritePartitions: Map[String, String] = null
+  private var overwritePartitions: Option[Map[String, String]] = None
 
   override def build =
-    new SparkV2Write(table, overwriteDynamic, 
Option.apply(overwritePartitions), writeSchema)
+    new PaimonV2Write(table, overwriteDynamic, overwritePartitions, 
writeSchema)
 
   override def overwrite(filters: Array[Filter]): WriteBuilder = {
     if (overwriteDynamic) {
@@ -49,17 +49,17 @@ class SparkV2WriteBuilder(table: FileStoreTable, 
writeSchema: StructType)
     }
 
     if (isTruncate(conjunctiveFilters.get)) {
-      overwritePartitions = Map.empty[String, String]
+      overwritePartitions = Option.apply(Map.empty[String, String])
     } else {
-      overwritePartitions =
-        convertPartitionFilterToMap(conjunctiveFilters.get, 
table.schema.logicalPartitionType())
+      overwritePartitions = Option.apply(
+        convertPartitionFilterToMap(conjunctiveFilters.get, 
table.schema.logicalPartitionType()))
     }
 
     this
   }
 
   override def overwriteDynamicPartitions(): WriteBuilder = {
-    if (overwritePartitions != null) {
+    if (overwritePartitions.isDefined) {
       throw new IllegalArgumentException("Cannot overwrite dynamically and by 
filter both")
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWrite.scala
similarity index 94%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWrite.scala
index d8c8dc942c..3ed93b0dd3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWrite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.write.V1Write
 import org.apache.spark.sql.sources.InsertableRelation
 
 /** Spark [[V1Write]], it is required to use v1 write for grouping by bucket. 
*/
-class SparkWrite(val table: FileStoreTable, saveMode: SaveMode, options: 
Options) extends V1Write {
+class PaimonWrite(val table: FileStoreTable, saveMode: SaveMode, options: 
Options) extends V1Write {
 
   override def toInsertableRelation: InsertableRelation = {
     (data: DataFrame, overwrite: Boolean) =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
similarity index 92%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
index bfb76552fb..8ce226d619 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
@@ -25,13 +25,13 @@ import org.apache.paimon.table.FileStoreTable
 import org.apache.spark.sql.connector.write.{SupportsOverwrite, WriteBuilder}
 import org.apache.spark.sql.sources._
 
-class SparkWriteBuilder(table: FileStoreTable, options: Options)
+class PaimonWriteBuilder(table: FileStoreTable, options: Options)
   extends BaseWriteBuilder(table)
   with SupportsOverwrite {
 
   private var saveMode: SaveMode = InsertInto
 
-  override def build = new SparkWrite(table, saveMode, options)
+  override def build = new PaimonWrite(table, saveMode, options)
 
   override def overwrite(filters: Array[Filter]): WriteBuilder = {
     failIfCanNotOverwrite(filters)
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
similarity index 73%
rename from 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
rename to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
index b55d5ca85a..f2e52fda79 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
@@ -19,6 +19,7 @@
 package org.apache.paimon.spark.write
 
 import org.apache.paimon.table.{BucketMode, FileStoreTable}
+import org.apache.paimon.table.BucketMode._
 
 import org.apache.spark.sql.connector.distributions.{ClusteredDistribution, 
Distribution, Distributions}
 import org.apache.spark.sql.connector.expressions.{Expression, Expressions, 
SortOrder}
@@ -26,22 +27,27 @@ import 
org.apache.spark.sql.connector.expressions.{Expression, Expressions, Sort
 import scala.collection.JavaConverters._
 
 /** Distribution requirements of Spark write. */
-case class SparkWriteRequirement(distribution: Distribution, ordering: 
Array[SortOrder])
+case class PaimonWriteRequirement(distribution: Distribution, ordering: 
Array[SortOrder])
+
+object PaimonWriteRequirement {
 
-object SparkWriteRequirement {
   private val EMPTY_ORDERING: Array[SortOrder] = Array.empty
-  private val EMPTY: SparkWriteRequirement =
-    SparkWriteRequirement(Distributions.unspecified(), EMPTY_ORDERING)
+  private val EMPTY: PaimonWriteRequirement =
+    PaimonWriteRequirement(Distributions.unspecified(), EMPTY_ORDERING)
 
-  def apply(table: FileStoreTable): SparkWriteRequirement = {
+  def apply(table: FileStoreTable): PaimonWriteRequirement = {
     val bucketSpec = table.bucketSpec()
     val bucketTransforms = bucketSpec.getBucketMode match {
-      case BucketMode.HASH_FIXED =>
-        Seq(
-          Expressions.bucket(
-            bucketSpec.getNumBuckets,
-            bucketSpec.getBucketKeys.asScala.map(quote).toArray: _*))
-      case BucketMode.BUCKET_UNAWARE =>
+      case HASH_FIXED =>
+        if (bucketSpec.getNumBuckets == POSTPONE_BUCKET) {
+          Seq.empty
+        } else {
+          Seq(
+            Expressions.bucket(
+              bucketSpec.getNumBuckets,
+              bucketSpec.getBucketKeys.asScala.map(quote).toArray: _*))
+        }
+      case BUCKET_UNAWARE =>
         Seq.empty
       case _ =>
         throw new UnsupportedOperationException(
@@ -58,7 +64,7 @@ object SparkWriteRequirement {
     } else {
       val distribution: ClusteredDistribution =
         Distributions.clustered(clusteringExpressions)
-      SparkWriteRequirement(distribution, EMPTY_ORDERING)
+      PaimonWriteRequirement(distribution, EMPTY_ORDERING)
     }
   }
 

Reply via email to