This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4769f29683 [spark] Unify config extraction for maxPartitionBytes and
openCostInBytes between paimon and spark (#6037)
4769f29683 is described below
commit 4769f29683980174b32ae62aa26f401224036424
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Aug 7 17:37:45 2025 +0800
[spark] Unify config extraction for maxPartitionBytes and openCostInBytes
between paimon and spark (#6037)
---
.../scala/org/apache/paimon/spark/ScanHelper.scala | 53 ++++++++++++++++++----
.../org/apache/paimon/spark/ScanHelperTest.scala | 31 ++++++++++++-
2 files changed, 74 insertions(+), 10 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index c0b35ec0d1..b14abebd64 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -19,17 +19,20 @@
package org.apache.paimon.spark
import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions._
import org.apache.paimon.io.DataFileMeta
import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackDataSplit
import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.PaimonSparkSession
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.internal.SQLConf
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-trait ScanHelper extends Logging {
+trait ScanHelper extends SQLConfHelper with Logging {
private val spark = PaimonSparkSession.active
@@ -37,12 +40,36 @@ trait ScanHelper extends Logging {
private lazy val deletionVectors: Boolean =
coreOptions.deletionVectorsEnabled()
- private lazy val openCostInBytes: Long = coreOptions.splitOpenFileCost()
+ private lazy val filesMaxPartitionBytes: Long = {
+ val options = coreOptions.toConfiguration
+ var _filesMaxPartitionBytes =
SOURCE_SPLIT_TARGET_SIZE.defaultValue().getBytes
+
+ if (conf.contains(SQLConf.FILES_MAX_PARTITION_BYTES.key)) {
+ _filesMaxPartitionBytes = conf.getConf(SQLConf.FILES_MAX_PARTITION_BYTES)
+ }
+ if (options.containsKey(SOURCE_SPLIT_TARGET_SIZE.key())) {
+ _filesMaxPartitionBytes = options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes
+ }
+ _filesMaxPartitionBytes
+ }
+
+ private lazy val openCostInBytes: Long = {
+ val options = coreOptions.toConfiguration
+ var _openCostBytes = SOURCE_SPLIT_OPEN_FILE_COST.defaultValue().getBytes
+
+ if (conf.contains(SQLConf.FILES_OPEN_COST_IN_BYTES.key)) {
+ _openCostBytes = conf.getConf(SQLConf.FILES_OPEN_COST_IN_BYTES)
+ }
+ if (options.containsKey(SOURCE_SPLIT_OPEN_FILE_COST.key())) {
+ _openCostBytes = options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes
+ }
+ _openCostBytes
+ }
private lazy val leafNodeDefaultParallelism: Int = {
- spark.conf
- .get("spark.sql.leafNodeDefaultParallelism",
spark.sparkContext.defaultParallelism.toString)
- .toInt
+ conf
+ .getConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM)
+ .getOrElse(spark.sparkContext.defaultParallelism)
}
def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] = {
@@ -57,7 +84,8 @@ trait ScanHelper extends Logging {
val all = reserved.map(PaimonInputPartition.apply) ++ reshuffled
val duration = System.currentTimeMillis() - startTS
logInfo(
- s"Reshuffle splits from ${toReshuffle.length} to ${reshuffled.length}
in $duration ms. Total number of splits is ${all.length}")
+ s"Reshuffle splits from ${toReshuffle.length} to ${reshuffled.length}
in $duration ms. " +
+ s"Total number of splits is ${all.length}")
all
} else {
splits.map(PaimonInputPartition.apply)
@@ -164,12 +192,19 @@ trait ScanHelper extends Logging {
private def computeMaxSplitBytes(dataSplits: Seq[DataSplit]): Long = {
val dataFiles = dataSplits.flatMap(unpack)
- val defaultMaxSplitBytes = spark.sessionState.conf.filesMaxPartitionBytes
- val minPartitionNum = spark.sessionState.conf.filesMinPartitionNum
+ val defaultMaxSplitBytes = filesMaxPartitionBytes
+ val minPartitionNum = conf.filesMinPartitionNum
.getOrElse(leafNodeDefaultParallelism)
val totalBytes = dataFiles.map(file => file.fileSize + openCostInBytes).sum
val bytesPerCore = totalBytes / minPartitionNum
- Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+ val maxSplitBytes = Math.min(defaultMaxSplitBytes,
Math.max(openCostInBytes, bytesPerCore))
+ logInfo(
+ s"File open cost in bytes: $openCostInBytes, " +
+ s"default max split bytes: $defaultMaxSplitBytes, " +
+ s"min partition num: $minPartitionNum, " +
+ s"final max split bytes: $maxSplitBytes")
+
+ maxSplitBytes
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index febf15f9e9..bb0eccebad 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -113,9 +113,38 @@ class ScanHelperTest extends PaimonSparkTestBase {
Assertions.assertEquals(1, reshuffled.length)
}
+ test("Paimon: set open-file-cost to 0") {
+ withTable("t") {
+ sql("CREATE TABLE t (a INT, b STRING)")
+ for (i <- 1 to 100) {
+ sql(s"INSERT INTO t VALUES ($i, 'a')")
+ }
+
+ def paimonScan() = getPaimonScan("SELECT * FROM t")
+
+ // default openCostInBytes is 4m, so we will get 400 / 128 = 4 partitions
+ withSparkSQLConf("spark.sql.leafNodeDefaultParallelism" -> "1") {
+ assert(paimonScan().lazyInputPartitions.length == 4)
+ }
+
+ withSparkSQLConf(
+ "spark.sql.files.openCostInBytes" -> "0",
+ "spark.sql.leafNodeDefaultParallelism" -> "1") {
+ assert(paimonScan().lazyInputPartitions.length == 1)
+ }
+
+ // Paimon's conf takes precedence over Spark's
+ withSparkSQLConf(
+ "spark.sql.files.openCostInBytes" -> "4194304",
+ "spark.paimon.source.split.open-file-cost" -> "0",
+ "spark.sql.leafNodeDefaultParallelism" -> "1") {
+ assert(paimonScan().lazyInputPartitions.length == 1)
+ }
+ }
+ }
+
class FakeScan extends ScanHelper {
override val coreOptions: CoreOptions =
CoreOptions.fromMap(new JHashMap[String, String]())
}
-
}