This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 874d4140d1 [spark] Introduce 
source.split.target-size-with-column-pruning (#6837)
874d4140d1 is described below

commit 874d4140d1a07bb9543b0a91c65a80483a747356
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Dec 19 19:53:15 2025 +0800

    [spark] Introduce source.split.target-size-with-column-pruning (#6837)
---
 .../generated/spark_connector_configuration.html   |  6 +++++
 .../apache/paimon/spark/SparkConnectorOptions.java |  8 ++++++
 .../org/apache/paimon/spark/scan/BaseScan.scala    | 20 +++++++++++---
 .../paimon/spark/scan/BinPackingSplits.scala       | 31 +++++++++++-----------
 .../paimon/spark/scan/PaimonCopyOnWriteScan.scala  |  3 +--
 .../paimon/spark/scan/PaimonStatistics.scala       |  3 ++-
 .../org/apache/paimon/spark/util/OptionUtils.scala |  4 +++
 .../org/apache/paimon/spark/util/SplitUtils.scala  | 23 ++++++++++++++--
 .../apache/paimon/spark/BinPackingSplitsTest.scala | 31 ++++++++++++++++++++++
 9 files changed, 104 insertions(+), 25 deletions(-)

diff --git 
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html 
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 8f06adecf9..6f16a972c2 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -74,6 +74,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether to verify SparkSession is initialized with required 
configurations.</td>
         </tr>
+        <tr>
+            <td><h5>source.split.target-size-with-column-pruning</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to adjust the target split size based on pruned 
(projected) columns. If enabled, split size estimation uses only the columns 
actually being read.</td>
+        </tr>
         <tr>
             <td><h5>write.merge-schema</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 40ad19f44c..05630caceb 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -98,4 +98,12 @@ public class SparkConnectorOptions {
                     .defaultValue(true)
                     .withDescription(
                             "Whether to allow full scan when reading a 
partitioned table.");
+
+    public static final ConfigOption<Boolean> 
SOURCE_SPLIT_TARGET_SIZE_WITH_COLUMN_PRUNING =
+            key("source.split.target-size-with-column-pruning")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to adjust the target split size based on 
pruned (projected) columns. "
+                                    + "If enabled, split size estimation uses 
only the columns actually being read.");
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
index ffacda9853..dcd3dda67a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.predicate.{Predicate, TopN}
 import org.apache.paimon.spark.{PaimonBatch, PaimonInputPartition, 
PaimonNumSplitMetric, PaimonPartitionSizeMetric, PaimonReadBatchTimeMetric, 
PaimonResultedTableFilesMetric, PaimonResultedTableFilesTaskMetric, 
SparkTypeUtils}
 import org.apache.paimon.spark.schema.PaimonMetadataColumn
 import org.apache.paimon.spark.schema.PaimonMetadataColumn._
-import org.apache.paimon.spark.util.SplitUtils
+import org.apache.paimon.spark.util.{OptionUtils, SplitUtils}
 import org.apache.paimon.table.{SpecialFields, Table}
 import org.apache.paimon.table.source.{ReadBuilder, Split}
 import org.apache.paimon.types.RowType
@@ -53,8 +53,9 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
   // Input splits
   def inputSplits: Array[Split]
   def inputPartitions: Seq[PaimonInputPartition] = 
getInputPartitions(inputSplits)
-  def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] =
-    BinPackingSplits(coreOptions).pack(splits)
+  def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] = {
+    BinPackingSplits(coreOptions, readRowSizeRatio).pack(splits)
+  }
 
   val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
 
@@ -130,6 +131,17 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
       table.statistics())
   }
 
+  def readRowSizeRatio: Double = {
+    if (OptionUtils.sourceSplitTargetSizeWithColumnPruning()) {
+      estimateStatistics match {
+        case stats: PaimonStatistics => stats.readRowSizeRatio
+        case _ => 1.0
+      }
+    } else {
+      1.0
+    }
+  }
+
   override def supportedCustomMetrics: Array[CustomMetric] = {
     Array(
       PaimonNumSplitMetric(),
@@ -140,7 +152,7 @@ trait BaseScan extends Scan with SupportsReportStatistics 
with Logging {
   }
 
   override def reportDriverMetrics(): Array[CustomTaskMetric] = {
-    val filesCount = inputSplits.map(SplitUtils.fileCount).sum
+    val filesCount = inputSplits.map(SplitUtils.dataFileCount).sum
     Array(
       PaimonResultedTableFilesTaskMetric(filesCount)
     )
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
index f22bce3ec1..8516af76a3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
@@ -22,7 +22,9 @@ import org.apache.paimon.CoreOptions
 import org.apache.paimon.CoreOptions._
 import org.apache.paimon.io.DataFileMeta
 import org.apache.paimon.spark.PaimonInputPartition
+import org.apache.paimon.spark.util.SplitUtils
 import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackDataSplit
+import org.apache.paimon.table.format.FormatDataSplit
 import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
 
 import org.apache.spark.internal.Logging
@@ -33,7 +35,9 @@ import org.apache.spark.sql.internal.SQLConf
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-case class BinPackingSplits(coreOptions: CoreOptions) extends SQLConfHelper 
with Logging {
+case class BinPackingSplits(coreOptions: CoreOptions, readRowSizeRatio: Double 
= 1.0)
+  extends SQLConfHelper
+  with Logging {
 
   private val spark = PaimonSparkSession.active
 
@@ -75,6 +79,8 @@ case class BinPackingSplits(coreOptions: CoreOptions) extends 
SQLConfHelper with
     val (toReshuffle, reserved) = splits.partition {
       case _: FallbackDataSplit => false
       case split: DataSplit => split.beforeFiles().isEmpty && 
split.rawConvertible()
+      // Currently, format table reader only supports reading one file.
+      case _: FormatDataSplit => false
       case _ => false
     }
     if (toReshuffle.nonEmpty) {
@@ -132,8 +138,9 @@ case class BinPackingSplits(coreOptions: CoreOptions) 
extends SQLConfHelper with
         val ddFiles = dataFileAndDeletionFiles(split)
         ddFiles.foreach {
           case (dataFile, deletionFile) =>
-            val size = dataFile
-              .fileSize() + openCostInBytes + 
Option(deletionFile).map(_.length()).getOrElse(0L)
+            val size =
+              (dataFile.fileSize() * readRowSizeRatio).toLong + 
openCostInBytes + Option(
+                deletionFile).map(_.length()).getOrElse(0L)
             if (currentSize + size > maxSplitBytes) {
               closeInputPartition()
             }
@@ -149,14 +156,6 @@ case class BinPackingSplits(coreOptions: CoreOptions) 
extends SQLConfHelper with
     partitions.toArray
   }
 
-  private def unpack(split: Split): Array[DataFileMeta] = {
-    split match {
-      case ds: DataSplit =>
-        ds.dataFiles().asScala.toArray
-      case _ => Array.empty
-    }
-  }
-
   private def copyDataSplit(
       split: DataSplit,
       dataFiles: Seq[DataFileMeta],
@@ -190,12 +189,12 @@ case class BinPackingSplits(coreOptions: CoreOptions) 
extends SQLConfHelper with
   }
 
   private def computeMaxSplitBytes(dataSplits: Seq[DataSplit]): Long = {
-    val dataFiles = dataSplits.flatMap(unpack)
     val defaultMaxSplitBytes = filesMaxPartitionBytes
-    val minPartitionNum = conf.filesMinPartitionNum
-      .getOrElse(leafNodeDefaultParallelism)
-    val totalBytes = dataFiles.map(file => file.fileSize + openCostInBytes).sum
-    val bytesPerCore = totalBytes / minPartitionNum
+    val minPartitionNum = 
conf.filesMinPartitionNum.getOrElse(leafNodeDefaultParallelism)
+
+    val totalRawBytes =
+      dataSplits.map(s => SplitUtils.splitSize(s) + SplitUtils.fileCount(s) * 
openCostInBytes).sum
+    val bytesPerCore = totalRawBytes / minPartitionNum
 
     val maxSplitBytes = Math.min(defaultMaxSplitBytes, 
Math.max(openCostInBytes, bytesPerCore))
     logInfo(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
index 6ee1ba38df..3b4efbde0d 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
@@ -20,7 +20,6 @@ package org.apache.paimon.spark.scan
 
 import org.apache.paimon.partition.PartitionPredicate
 import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
-import org.apache.paimon.spark.PaimonBatch
 import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
 import org.apache.paimon.table.{FileStoreTable, InnerTable}
 import org.apache.paimon.table.source.{DataSplit, Split}
@@ -28,7 +27,7 @@ import org.apache.paimon.table.source.{DataSplit, Split}
 import org.apache.spark.sql.PaimonUtils
 import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
 import org.apache.spark.sql.connector.expressions.filter.{Predicate => 
SparkPredicate}
-import org.apache.spark.sql.connector.read.{Batch, SupportsRuntimeV2Filtering}
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
 import org.apache.spark.sql.sources.{Filter, In}
 import org.apache.spark.sql.types.StructType
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
index 4ffd9ae64e..bb4fa377cd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
@@ -68,7 +68,8 @@ case class PaimonStatistics(
     }
   }
 
-  lazy val readRowSizeRatio: Double = estimateRowSize(readRowType) / 
estimateRowSize(tableRowType)
+  lazy val readRowSizeRatio: Double =
+    estimateRowSize(readRowType).toDouble / estimateRowSize(tableRowType)
 
   private def estimateRowSize(rowType: RowType): Long = {
     rowType.getFields.asScala.map(estimateFieldSize).sum
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index acbb378586..2e6014ff1a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -111,6 +111,10 @@ object OptionUtils extends SQLConfHelper with Logging {
     getOptionString(SparkConnectorOptions.READ_ALLOW_FULL_SCAN).toBoolean
   }
 
+  def sourceSplitTargetSizeWithColumnPruning(): Boolean = {
+    
getOptionString(SparkConnectorOptions.SOURCE_SPLIT_TARGET_SIZE_WITH_COLUMN_PRUNING).toBoolean
+  }
+
   private def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String, 
String] = {
     val mergedOptions = new JHashMap[String, String](
       conf.getAllConfs
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
index e2c14845c6..038f3ae307 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
@@ -21,24 +21,43 @@ package org.apache.paimon.spark.util
 import org.apache.paimon.table.format.FormatDataSplit
 import org.apache.paimon.table.source.{DataSplit, Split}
 
+import java.util.{Collections => JCollections}
+
 import scala.collection.JavaConverters._
 
 object SplitUtils {
 
   def splitSize(split: Split): Long = {
     split match {
-      case ds: DataSplit => ds.dataFiles().asScala.map(_.fileSize).sum
+      case ds: DataSplit =>
+        ds.dataFiles().asScala.map(_.fileSize).sum
       case fs: FormatDataSplit =>
         if (fs.length() == null) fs.fileSize() else fs.length().longValue()
       case _ => 0
     }
   }
 
-  def fileCount(split: Split): Long = {
+  def fileCount(split: Split): Long = dataFileCount(split) + 
deleteFileCount(split)
+
+  def dataFileCount(split: Split): Long = {
     split match {
       case ds: DataSplit => ds.dataFiles().size()
       case _: FormatDataSplit => 1
       case _ => 0
     }
   }
+
+  def deleteFileCount(split: Split): Long = {
+    split match {
+      case ds: DataSplit =>
+        ds.deletionFiles()
+          .orElse(JCollections.emptyList())
+          .asScala
+          .filter(_ != null)
+          .map(_.path())
+          .distinct
+          .size
+      case _ => 0
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
index 72941b8c0d..994dd2cb09 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
@@ -145,4 +145,35 @@ class BinPackingSplitsTest extends PaimonSparkTestBase {
       }
     }
   }
+
+  test("Paimon: get read splits with column pruning") {
+    withTable("t") {
+      sql(
+        "CREATE TABLE t (a INT, s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5 
STRING, s6 STRING, s7 STRING, s8 STRING, s9 STRING)")
+      sql(
+        "INSERT INTO t SELECT  /*+ REPARTITION(10) */ id, uuid(), uuid(), 
uuid(), uuid(), uuid(), uuid(), uuid(), uuid(), uuid() FROM range(1000000)")
+
+      withSparkSQLConf(
+        "spark.sql.files.minPartitionNum" -> "1",
+        "spark.sql.files.openCostInBytes" -> "0",
+        "spark.paimon.source.split.target-size" -> "32m") {
+        for (splitSizeWithColumnPruning <- Seq("true", "false")) {
+          withSparkSQLConf(
+            "spark.paimon.source.split.target-size-with-column-pruning" -> 
splitSizeWithColumnPruning) {
+            // Select one col
+            var partitionCount = getPaimonScan("SELECT s1 FROM 
t").inputPartitions.length
+            if (splitSizeWithColumnPruning.toBoolean) {
+              assert(partitionCount == 1)
+            } else {
+              assert(partitionCount > 1)
+            }
+
+            // Select all cols
+            partitionCount = getPaimonScan("SELECT * FROM 
t").inputPartitions.length
+            assert(partitionCount > 1)
+          }
+        }
+      }
+    }
+  }
 }

Reply via email to