This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 74c7f68c7d [spark] Minor refactor for spark v2 writer (#5564)
74c7f68c7d is described below
commit 74c7f68c7db1776b80c96f1d819131f851aa9b98
Author: Zouxxyy <[email protected]>
AuthorDate: Tue May 6 10:25:46 2025 +0800
[spark] Minor refactor for spark v2 writer (#5564)
---
.../generated/spark_connector_configuration.html | 6 ++++
.../analysis/expressions/ExpressionHelper.scala | 37 ----------------------
.../scala/org/apache/paimon/spark/SparkTable.scala | 14 ++++----
.../analysis/expressions/ExpressionHelper.scala | 14 ++++----
.../paimon/spark/commands/PaimonSparkWriter.scala | 3 +-
.../{SparkV2Write.scala => PaimonV2Write.scala} | 24 +++++++++-----
...iteBuilder.scala => PaimonV2WriteBuilder.scala} | 14 ++++----
.../write/{SparkWrite.scala => PaimonWrite.scala} | 2 +-
...WriteBuilder.scala => PaimonWriteBuilder.scala} | 4 +--
...uirement.scala => PaimonWriteRequirement.scala} | 30 +++++++++++-------
10 files changed, 64 insertions(+), 84 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 7074cbc05b..06f5093477 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -80,5 +80,11 @@ under the License.
<td>Boolean</td>
<td>If true, allow to merge data types if the two types meet the
rules for explicit casting.</td>
</tr>
+ <tr>
+ <td><h5>write.use-v2-write</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true, v2 write will be used. Currently, only HASH_FIXED and
BUCKET_UNAWARE bucket modes are supported. Will fall back to v1 write for other
bucket modes. Currently, Spark V2 write does not support
TableCapability.STREAMING_WRITE and TableCapability.ACCEPT_ANY_SCHEMA.</td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index f4890f84bd..56223c36cd 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -20,12 +20,10 @@ package
org.apache.paimon.spark.catalyst.analysis.expressions
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.SparkFilterConverter
-import org.apache.paimon.spark.write.SparkWriteBuilder
import org.apache.paimon.types.RowType
import org.apache.spark.sql.PaimonUtils.{normalizeExprs, translateFilter}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
-import org.apache.spark.sql.sources.{AlwaysTrue, And => SourceAnd,
EqualNullSafe, EqualTo, Filter => SourceFilter}
trait ExpressionHelper extends ExpressionHelperBase {
@@ -54,39 +52,4 @@ trait ExpressionHelper extends ExpressionHelperBase {
Some(PredicateBuilder.and(predicates: _*))
}
}
-
- /**
- * For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call
the `truncate`
- * methods where the `AlwaysTrue` Filter is used.
- */
- def isTruncate(filter: SourceFilter): Boolean = {
- val filters = splitConjunctiveFilters(filter)
- filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
- }
-
- /** See [[ SparkWriteBuilder#failIfCanNotOverwrite]] */
- def convertPartitionFilterToMap(
- filter: SourceFilter,
- partitionRowType: RowType): Map[String, String] = {
- // todo: replace it with SparkV2FilterConverter when we drop Spark3.2
- val converter = new SparkFilterConverter(partitionRowType)
- splitConjunctiveFilters(filter).map {
- case EqualNullSafe(attribute, value) =>
- (attribute, converter.convertString(attribute, value))
- case EqualTo(attribute, value) =>
- (attribute, converter.convertString(attribute, value))
- case _ =>
- // Should not happen
- throw new RuntimeException(
- s"Only support Overwrite filters with Equal and EqualNullSafe, but
got: $filter")
- }.toMap
- }
-
- private def splitConjunctiveFilters(filter: SourceFilter): Seq[SourceFilter]
= {
- filter match {
- case SourceAnd(filter1, filter2) =>
- splitConjunctiveFilters(filter1) ++ splitConjunctiveFilters(filter2)
- case other => other :: Nil
- }
- }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
index bdc0d2cc29..03f1ff7af4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTable.scala
@@ -23,7 +23,7 @@ import org.apache.paimon.options.Options
import org.apache.paimon.spark.catalog.functions.BucketFunction
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.util.OptionUtils
-import org.apache.paimon.spark.write.{SparkV2WriteBuilder, SparkWriteBuilder}
+import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
import org.apache.paimon.table.{BucketMode, DataTable, FileStoreTable,
KnownSplitsTable, Table}
import org.apache.paimon.utils.StringUtils
@@ -47,7 +47,7 @@ case class SparkTable(table: Table)
with PaimonPartitionManagement {
private lazy val useV2Write: Boolean = {
- val v2WriteConfigured = OptionUtils.useV2Write
+ val v2WriteConfigured = OptionUtils.useV2Write()
v2WriteConfigured && supportsV2Write
}
@@ -55,10 +55,8 @@ case class SparkTable(table: Table)
table match {
case storeTable: FileStoreTable =>
storeTable.bucketMode() match {
- case BucketMode.HASH_FIXED =>
- storeTable.coreOptions().bucket() > 0 &&
BucketFunction.supportsTable(storeTable)
- case BucketMode.BUCKET_UNAWARE =>
- storeTable.coreOptions().bucket() == BucketMode.UNAWARE_BUCKET
+ case BucketMode.HASH_FIXED =>
BucketFunction.supportsTable(storeTable)
+ case BucketMode.BUCKET_UNAWARE => true
case _ => false
}
@@ -137,9 +135,9 @@ case class SparkTable(table: Table)
case fileStoreTable: FileStoreTable =>
val options = Options.fromMap(info.options)
if (useV2Write) {
- new SparkV2WriteBuilder(fileStoreTable, info.schema())
+ new PaimonV2WriteBuilder(fileStoreTable, info.schema())
} else {
- new SparkWriteBuilder(fileStoreTable, options)
+ new PaimonWriteBuilder(fileStoreTable, options)
}
case _ =>
throw new RuntimeException("Only FileStoreTable can be written.")
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
index 2cb739ee6d..682cf88fcf 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/expressions/ExpressionHelper.scala
@@ -21,7 +21,7 @@ package org.apache.paimon.spark.catalyst.analysis.expressions
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
import org.apache.paimon.spark.{SparkFilterConverter, SparkV2FilterConverter}
import org.apache.paimon.spark.catalyst.Compatibility
-import org.apache.paimon.spark.write.SparkWriteBuilder
+import org.apache.paimon.spark.write.PaimonWriteBuilder
import org.apache.paimon.types.RowType
import org.apache.spark.sql.{Column, SparkSession}
@@ -66,6 +66,11 @@ trait ExpressionHelper extends ExpressionHelperBase {
Some(PredicateBuilder.and(predicates: _*))
}
}
+}
+
+trait ExpressionHelperBase extends PredicateHelper {
+
+ import ExpressionHelper._
/**
* For the 'INSERT OVERWRITE' semantics of SQL, Spark DataSourceV2 will call
the `truncate`
@@ -76,7 +81,7 @@ trait ExpressionHelper extends ExpressionHelperBase {
filters.length == 1 && filters.head.isInstanceOf[AlwaysTrue]
}
- /** See [[ SparkWriteBuilder#failIfCanNotOverwrite]] */
+ /** See [[ PaimonWriteBuilder#failIfCanNotOverwrite]] */
def convertPartitionFilterToMap(
filter: SourceFilter,
partitionRowType: RowType): Map[String, String] = {
@@ -101,11 +106,6 @@ trait ExpressionHelper extends ExpressionHelperBase {
case other => other :: Nil
}
}
-}
-
-trait ExpressionHelperBase extends PredicateHelper {
-
- import ExpressionHelper._
def toColumn(expr: Expression): Column = {
SparkShimLoader.getSparkShim.column(expr)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 831a7966f3..5ff7b7a0e2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -25,7 +25,6 @@ import org.apache.paimon.crosspartition.{IndexBootstrap,
KeyPartOrRow}
import org.apache.paimon.data.serializer.InternalSerializers
import org.apache.paimon.deletionvectors.DeletionVector
import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer
-import org.apache.paimon.fs.Path
import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
import org.apache.paimon.manifest.FileKind
@@ -242,7 +241,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
writeWithoutBucket(data)
case HASH_FIXED =>
- if (table.bucketSpec().getNumBuckets == -2) {
+ if (table.bucketSpec().getNumBuckets == POSTPONE_BUCKET) {
writeWithoutBucket(data)
} else if (paimonExtensionEnabled &&
BucketFunction.supportsTable(table)) {
// Topology: input -> shuffle by partition & bucket
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
similarity index 91%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index 3f2018060e..a97cccfbbc 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2Write.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -35,7 +35,7 @@ import java.io.{IOException, UncheckedIOException}
import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}
-class SparkV2Write(
+class PaimonV2Write(
storeTable: FileStoreTable,
overwriteDynamic: Boolean,
overwritePartitions: Option[Map[String, String]],
@@ -58,7 +58,7 @@ class SparkV2Write(
builder
}
- private val writeRequirement = SparkWriteRequirement(table)
+ private val writeRequirement = PaimonWriteRequirement(table)
override def requiredDistribution(): Distribution = {
val distribution = writeRequirement.distribution
@@ -74,11 +74,19 @@ class SparkV2Write(
override def toBatch: BatchWrite = new PaimonBatchWrite
- override def toString: String =
- if (overwriteDynamic)
- s"PaimonWrite(table=${table.fullName()}, overwriteDynamic=true)"
- else
- s"PaimonWrite(table=${table.fullName()},
overwritePartitions=$overwritePartitions)"
+ override def toString: String = {
+ val overwriteDynamicStr = if (overwriteDynamic) {
+ ", overwriteDynamic=true"
+ } else {
+ ""
+ }
+ val overwritePartitionsStr = overwritePartitions match {
+ case Some(partitions) if partitions.nonEmpty => s",
overwritePartitions=$partitions"
+ case Some(_) => ", overwriteTable=true"
+ case None => ""
+ }
+
s"PaimonWrite(table=${table.fullName()}$overwriteDynamicStr$overwritePartitionsStr)"
+ }
private class PaimonBatchWrite extends BatchWrite {
override def createBatchWriterFactory(info: PhysicalWriteInfo):
DataWriterFactory =
@@ -91,7 +99,7 @@ class SparkV2Write(
val batchTableCommit = batchWriteBuilder.newCommit()
val commitMessages = messages.collect {
- case taskCommit: TaskCommit => taskCommit.commitMessages
+ case taskCommit: TaskCommit => taskCommit.commitMessages()
case other =>
throw new IllegalArgumentException(s"${other.getClass.getName} is
not supported")
}.flatten
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
similarity index 82%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
index a8b9a1c211..462fd3311f 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkV2WriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2WriteBuilder.scala
@@ -24,16 +24,16 @@ import
org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite, SupportsO
import org.apache.spark.sql.sources.{And, Filter}
import org.apache.spark.sql.types.StructType
-class SparkV2WriteBuilder(table: FileStoreTable, writeSchema: StructType)
+class PaimonV2WriteBuilder(table: FileStoreTable, writeSchema: StructType)
extends BaseWriteBuilder(table)
with SupportsOverwrite
with SupportsDynamicOverwrite {
private var overwriteDynamic = false
- private var overwritePartitions: Map[String, String] = null
+ private var overwritePartitions: Option[Map[String, String]] = None
override def build =
- new SparkV2Write(table, overwriteDynamic,
Option.apply(overwritePartitions), writeSchema)
+ new PaimonV2Write(table, overwriteDynamic, overwritePartitions,
writeSchema)
override def overwrite(filters: Array[Filter]): WriteBuilder = {
if (overwriteDynamic) {
@@ -49,17 +49,17 @@ class SparkV2WriteBuilder(table: FileStoreTable,
writeSchema: StructType)
}
if (isTruncate(conjunctiveFilters.get)) {
- overwritePartitions = Map.empty[String, String]
+ overwritePartitions = Option.apply(Map.empty[String, String])
} else {
- overwritePartitions =
- convertPartitionFilterToMap(conjunctiveFilters.get,
table.schema.logicalPartitionType())
+ overwritePartitions = Option.apply(
+ convertPartitionFilterToMap(conjunctiveFilters.get,
table.schema.logicalPartitionType()))
}
this
}
override def overwriteDynamicPartitions(): WriteBuilder = {
- if (overwritePartitions != null) {
+ if (overwritePartitions.isDefined) {
throw new IllegalArgumentException("Cannot overwrite dynamically and by
filter both")
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWrite.scala
similarity index 94%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWrite.scala
index d8c8dc942c..3ed93b0dd3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWrite.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWrite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.sources.InsertableRelation
/** Spark [[V1Write]], it is required to use v1 write for grouping by bucket.
*/
-class SparkWrite(val table: FileStoreTable, saveMode: SaveMode, options:
Options) extends V1Write {
+class PaimonWrite(val table: FileStoreTable, saveMode: SaveMode, options:
Options) extends V1Write {
override def toInsertableRelation: InsertableRelation = {
(data: DataFrame, overwrite: Boolean) =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
similarity index 92%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
index bfb76552fb..8ce226d619 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteBuilder.scala
@@ -25,13 +25,13 @@ import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.connector.write.{SupportsOverwrite, WriteBuilder}
import org.apache.spark.sql.sources._
-class SparkWriteBuilder(table: FileStoreTable, options: Options)
+class PaimonWriteBuilder(table: FileStoreTable, options: Options)
extends BaseWriteBuilder(table)
with SupportsOverwrite {
private var saveMode: SaveMode = InsertInto
- override def build = new SparkWrite(table, saveMode, options)
+ override def build = new PaimonWrite(table, saveMode, options)
override def overwrite(filters: Array[Filter]): WriteBuilder = {
failIfCanNotOverwrite(filters)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
similarity index 73%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
index b55d5ca85a..f2e52fda79 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/SparkWriteRequirement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonWriteRequirement.scala
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.write
import org.apache.paimon.table.{BucketMode, FileStoreTable}
+import org.apache.paimon.table.BucketMode._
import org.apache.spark.sql.connector.distributions.{ClusteredDistribution,
Distribution, Distributions}
import org.apache.spark.sql.connector.expressions.{Expression, Expressions,
SortOrder}
@@ -26,22 +27,27 @@ import
org.apache.spark.sql.connector.expressions.{Expression, Expressions, Sort
import scala.collection.JavaConverters._
/** Distribution requirements of Spark write. */
-case class SparkWriteRequirement(distribution: Distribution, ordering:
Array[SortOrder])
+case class PaimonWriteRequirement(distribution: Distribution, ordering:
Array[SortOrder])
+
+object PaimonWriteRequirement {
-object SparkWriteRequirement {
private val EMPTY_ORDERING: Array[SortOrder] = Array.empty
- private val EMPTY: SparkWriteRequirement =
- SparkWriteRequirement(Distributions.unspecified(), EMPTY_ORDERING)
+ private val EMPTY: PaimonWriteRequirement =
+ PaimonWriteRequirement(Distributions.unspecified(), EMPTY_ORDERING)
- def apply(table: FileStoreTable): SparkWriteRequirement = {
+ def apply(table: FileStoreTable): PaimonWriteRequirement = {
val bucketSpec = table.bucketSpec()
val bucketTransforms = bucketSpec.getBucketMode match {
- case BucketMode.HASH_FIXED =>
- Seq(
- Expressions.bucket(
- bucketSpec.getNumBuckets,
- bucketSpec.getBucketKeys.asScala.map(quote).toArray: _*))
- case BucketMode.BUCKET_UNAWARE =>
+ case HASH_FIXED =>
+ if (bucketSpec.getNumBuckets == POSTPONE_BUCKET) {
+ Seq.empty
+ } else {
+ Seq(
+ Expressions.bucket(
+ bucketSpec.getNumBuckets,
+ bucketSpec.getBucketKeys.asScala.map(quote).toArray: _*))
+ }
+ case BUCKET_UNAWARE =>
Seq.empty
case _ =>
throw new UnsupportedOperationException(
@@ -58,7 +64,7 @@ object SparkWriteRequirement {
} else {
val distribution: ClusteredDistribution =
Distributions.clustered(clusteringExpressions)
- SparkWriteRequirement(distribution, EMPTY_ORDERING)
+ PaimonWriteRequirement(distribution, EMPTY_ORDERING)
}
}