This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4769f29683 [spark] Unify config extraction for maxPartitionBytes and 
openCostInBytes between paimon and spark (#6037)
4769f29683 is described below

commit 4769f29683980174b32ae62aa26f401224036424
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Aug 7 17:37:45 2025 +0800

    [spark] Unify config extraction for maxPartitionBytes and openCostInBytes 
between paimon and spark (#6037)
---
 .../scala/org/apache/paimon/spark/ScanHelper.scala | 53 ++++++++++++++++++----
 .../org/apache/paimon/spark/ScanHelperTest.scala   | 31 ++++++++++++-
 2 files changed, 74 insertions(+), 10 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
index c0b35ec0d1..b14abebd64 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
@@ -19,17 +19,20 @@
 package org.apache.paimon.spark
 
 import org.apache.paimon.CoreOptions
+import org.apache.paimon.CoreOptions._
 import org.apache.paimon.io.DataFileMeta
 import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackDataSplit
 import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.PaimonSparkSession
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.internal.SQLConf
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-trait ScanHelper extends Logging {
+trait ScanHelper extends SQLConfHelper with Logging {
 
   private val spark = PaimonSparkSession.active
 
@@ -37,12 +40,36 @@ trait ScanHelper extends Logging {
 
   private lazy val deletionVectors: Boolean = 
coreOptions.deletionVectorsEnabled()
 
-  private lazy val openCostInBytes: Long = coreOptions.splitOpenFileCost()
+  private lazy val filesMaxPartitionBytes: Long = {
+    val options = coreOptions.toConfiguration
+    var _filesMaxPartitionBytes = 
SOURCE_SPLIT_TARGET_SIZE.defaultValue().getBytes
+
+    if (conf.contains(SQLConf.FILES_MAX_PARTITION_BYTES.key)) {
+      _filesMaxPartitionBytes = conf.getConf(SQLConf.FILES_MAX_PARTITION_BYTES)
+    }
+    if (options.containsKey(SOURCE_SPLIT_TARGET_SIZE.key())) {
+      _filesMaxPartitionBytes = options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes
+    }
+    _filesMaxPartitionBytes
+  }
+
+  private lazy val openCostInBytes: Long = {
+    val options = coreOptions.toConfiguration
+    var _openCostBytes = SOURCE_SPLIT_OPEN_FILE_COST.defaultValue().getBytes
+
+    if (conf.contains(SQLConf.FILES_OPEN_COST_IN_BYTES.key)) {
+      _openCostBytes = conf.getConf(SQLConf.FILES_OPEN_COST_IN_BYTES)
+    }
+    if (options.containsKey(SOURCE_SPLIT_OPEN_FILE_COST.key())) {
+      _openCostBytes = options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes
+    }
+    _openCostBytes
+  }
 
   private lazy val leafNodeDefaultParallelism: Int = {
-    spark.conf
-      .get("spark.sql.leafNodeDefaultParallelism", 
spark.sparkContext.defaultParallelism.toString)
-      .toInt
+    conf
+      .getConf(SQLConf.LEAF_NODE_DEFAULT_PARALLELISM)
+      .getOrElse(spark.sparkContext.defaultParallelism)
   }
 
   def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] = {
@@ -57,7 +84,8 @@ trait ScanHelper extends Logging {
       val all = reserved.map(PaimonInputPartition.apply) ++ reshuffled
       val duration = System.currentTimeMillis() - startTS
       logInfo(
-        s"Reshuffle splits from ${toReshuffle.length} to ${reshuffled.length} 
in $duration ms. Total number of splits is ${all.length}")
+        s"Reshuffle splits from ${toReshuffle.length} to ${reshuffled.length} 
in $duration ms. " +
+          s"Total number of splits is ${all.length}")
       all
     } else {
       splits.map(PaimonInputPartition.apply)
@@ -164,12 +192,19 @@ trait ScanHelper extends Logging {
 
   private def computeMaxSplitBytes(dataSplits: Seq[DataSplit]): Long = {
     val dataFiles = dataSplits.flatMap(unpack)
-    val defaultMaxSplitBytes = spark.sessionState.conf.filesMaxPartitionBytes
-    val minPartitionNum = spark.sessionState.conf.filesMinPartitionNum
+    val defaultMaxSplitBytes = filesMaxPartitionBytes
+    val minPartitionNum = conf.filesMinPartitionNum
       .getOrElse(leafNodeDefaultParallelism)
     val totalBytes = dataFiles.map(file => file.fileSize + openCostInBytes).sum
     val bytesPerCore = totalBytes / minPartitionNum
 
-    Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
+    val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
+    logInfo(
+      s"File open cost in bytes: $openCostInBytes, " +
+        s"default max split bytes: $defaultMaxSplitBytes, " +
+        s"min partition num: $minPartitionNum, " +
+        s"final max split bytes: $maxSplitBytes")
+
+    maxSplitBytes
   }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
index febf15f9e9..bb0eccebad 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
@@ -113,9 +113,38 @@ class ScanHelperTest extends PaimonSparkTestBase {
     Assertions.assertEquals(1, reshuffled.length)
   }
 
+  test("Paimon: set open-file-cost to 0") {
+    withTable("t") {
+      sql("CREATE TABLE t (a INT, b STRING)")
+      for (i <- 1 to 100) {
+        sql(s"INSERT INTO t VALUES ($i, 'a')")
+      }
+
+      def paimonScan() = getPaimonScan("SELECT * FROM t")
+
+      // default openCostInBytes is 4m, so we will get 400 / 128 = 4 partitions
+      withSparkSQLConf("spark.sql.leafNodeDefaultParallelism" -> "1") {
+        assert(paimonScan().lazyInputPartitions.length == 4)
+      }
+
+      withSparkSQLConf(
+        "spark.sql.files.openCostInBytes" -> "0",
+        "spark.sql.leafNodeDefaultParallelism" -> "1") {
+        assert(paimonScan().lazyInputPartitions.length == 1)
+      }
+
+      // Paimon's conf takes precedence over Spark's
+      withSparkSQLConf(
+        "spark.sql.files.openCostInBytes" -> "4194304",
+        "spark.paimon.source.split.open-file-cost" -> "0",
+        "spark.sql.leafNodeDefaultParallelism" -> "1") {
+        assert(paimonScan().lazyInputPartitions.length == 1)
+      }
+    }
+  }
+
   class FakeScan extends ScanHelper {
     override val coreOptions: CoreOptions =
       CoreOptions.fromMap(new JHashMap[String, String]())
   }
-
 }

Reply via email to