This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 874d4140d1 [spark] Introduce
source.split.target-size-with-column-pruning (#6837)
874d4140d1 is described below
commit 874d4140d1a07bb9543b0a91c65a80483a747356
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Dec 19 19:53:15 2025 +0800
[spark] Introduce source.split.target-size-with-column-pruning (#6837)
---
.../generated/spark_connector_configuration.html | 6 +++++
.../apache/paimon/spark/SparkConnectorOptions.java | 8 ++++++
.../org/apache/paimon/spark/scan/BaseScan.scala | 20 +++++++++++---
.../paimon/spark/scan/BinPackingSplits.scala | 31 +++++++++++-----------
.../paimon/spark/scan/PaimonCopyOnWriteScan.scala | 3 +--
.../paimon/spark/scan/PaimonStatistics.scala | 3 ++-
.../org/apache/paimon/spark/util/OptionUtils.scala | 4 +++
.../org/apache/paimon/spark/util/SplitUtils.scala | 23 ++++++++++++++--
.../apache/paimon/spark/BinPackingSplitsTest.scala | 31 ++++++++++++++++++++++
9 files changed, 104 insertions(+), 25 deletions(-)
diff --git
a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
index 8f06adecf9..6f16a972c2 100644
--- a/docs/layouts/shortcodes/generated/spark_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/spark_connector_configuration.html
@@ -74,6 +74,12 @@ under the License.
<td>Boolean</td>
<td>Whether to verify SparkSession is initialized with required
configurations.</td>
</tr>
+ <tr>
+ <td><h5>source.split.target-size-with-column-pruning</h5></td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Whether to adjust the target split size based on pruned
(projected) columns. If enabled, split size estimation uses only the columns
actually being read.</td>
+ </tr>
<tr>
<td><h5>write.merge-schema</h5></td>
<td style="word-wrap: break-word;">false</td>
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
index 40ad19f44c..05630caceb 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkConnectorOptions.java
@@ -98,4 +98,12 @@ public class SparkConnectorOptions {
.defaultValue(true)
.withDescription(
"Whether to allow full scan when reading a
partitioned table.");
+
+ public static final ConfigOption<Boolean>
SOURCE_SPLIT_TARGET_SIZE_WITH_COLUMN_PRUNING =
+ key("source.split.target-size-with-column-pruning")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Whether to adjust the target split size based on
pruned (projected) columns. "
+ + "If enabled, split size estimation uses
only the columns actually being read.");
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
index ffacda9853..dcd3dda67a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
@@ -24,7 +24,7 @@ import org.apache.paimon.predicate.{Predicate, TopN}
import org.apache.paimon.spark.{PaimonBatch, PaimonInputPartition,
PaimonNumSplitMetric, PaimonPartitionSizeMetric, PaimonReadBatchTimeMetric,
PaimonResultedTableFilesMetric, PaimonResultedTableFilesTaskMetric,
SparkTypeUtils}
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
-import org.apache.paimon.spark.util.SplitUtils
+import org.apache.paimon.spark.util.{OptionUtils, SplitUtils}
import org.apache.paimon.table.{SpecialFields, Table}
import org.apache.paimon.table.source.{ReadBuilder, Split}
import org.apache.paimon.types.RowType
@@ -53,8 +53,9 @@ trait BaseScan extends Scan with SupportsReportStatistics
with Logging {
// Input splits
def inputSplits: Array[Split]
def inputPartitions: Seq[PaimonInputPartition] =
getInputPartitions(inputSplits)
- def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] =
- BinPackingSplits(coreOptions).pack(splits)
+ def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] = {
+ BinPackingSplits(coreOptions, readRowSizeRatio).pack(splits)
+ }
val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
@@ -130,6 +131,17 @@ trait BaseScan extends Scan with SupportsReportStatistics
with Logging {
table.statistics())
}
+ def readRowSizeRatio: Double = {
+ if (OptionUtils.sourceSplitTargetSizeWithColumnPruning()) {
+ estimateStatistics match {
+ case stats: PaimonStatistics => stats.readRowSizeRatio
+ case _ => 1.0
+ }
+ } else {
+ 1.0
+ }
+ }
+
override def supportedCustomMetrics: Array[CustomMetric] = {
Array(
PaimonNumSplitMetric(),
@@ -140,7 +152,7 @@ trait BaseScan extends Scan with SupportsReportStatistics
with Logging {
}
override def reportDriverMetrics(): Array[CustomTaskMetric] = {
- val filesCount = inputSplits.map(SplitUtils.fileCount).sum
+ val filesCount = inputSplits.map(SplitUtils.dataFileCount).sum
Array(
PaimonResultedTableFilesTaskMetric(filesCount)
)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
index f22bce3ec1..8516af76a3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
@@ -22,7 +22,9 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions._
import org.apache.paimon.io.DataFileMeta
import org.apache.paimon.spark.PaimonInputPartition
+import org.apache.paimon.spark.util.SplitUtils
import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackDataSplit
+import org.apache.paimon.table.format.FormatDataSplit
import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
import org.apache.spark.internal.Logging
@@ -33,7 +35,9 @@ import org.apache.spark.sql.internal.SQLConf
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-case class BinPackingSplits(coreOptions: CoreOptions) extends SQLConfHelper
with Logging {
+case class BinPackingSplits(coreOptions: CoreOptions, readRowSizeRatio: Double
= 1.0)
+ extends SQLConfHelper
+ with Logging {
private val spark = PaimonSparkSession.active
@@ -75,6 +79,8 @@ case class BinPackingSplits(coreOptions: CoreOptions) extends
SQLConfHelper with
val (toReshuffle, reserved) = splits.partition {
case _: FallbackDataSplit => false
case split: DataSplit => split.beforeFiles().isEmpty &&
split.rawConvertible()
+ // Currently, format table reader only supports reading one file.
+ case _: FormatDataSplit => false
case _ => false
}
if (toReshuffle.nonEmpty) {
@@ -132,8 +138,9 @@ case class BinPackingSplits(coreOptions: CoreOptions)
extends SQLConfHelper with
val ddFiles = dataFileAndDeletionFiles(split)
ddFiles.foreach {
case (dataFile, deletionFile) =>
- val size = dataFile
- .fileSize() + openCostInBytes +
Option(deletionFile).map(_.length()).getOrElse(0L)
+ val size =
+ (dataFile.fileSize() * readRowSizeRatio).toLong +
openCostInBytes + Option(
+ deletionFile).map(_.length()).getOrElse(0L)
if (currentSize + size > maxSplitBytes) {
closeInputPartition()
}
@@ -149,14 +156,6 @@ case class BinPackingSplits(coreOptions: CoreOptions)
extends SQLConfHelper with
partitions.toArray
}
- private def unpack(split: Split): Array[DataFileMeta] = {
- split match {
- case ds: DataSplit =>
- ds.dataFiles().asScala.toArray
- case _ => Array.empty
- }
- }
-
private def copyDataSplit(
split: DataSplit,
dataFiles: Seq[DataFileMeta],
@@ -190,12 +189,12 @@ case class BinPackingSplits(coreOptions: CoreOptions)
extends SQLConfHelper with
}
private def computeMaxSplitBytes(dataSplits: Seq[DataSplit]): Long = {
- val dataFiles = dataSplits.flatMap(unpack)
val defaultMaxSplitBytes = filesMaxPartitionBytes
- val minPartitionNum = conf.filesMinPartitionNum
- .getOrElse(leafNodeDefaultParallelism)
- val totalBytes = dataFiles.map(file => file.fileSize + openCostInBytes).sum
- val bytesPerCore = totalBytes / minPartitionNum
+ val minPartitionNum =
conf.filesMinPartitionNum.getOrElse(leafNodeDefaultParallelism)
+
+ val totalRawBytes =
+ dataSplits.map(s => SplitUtils.splitSize(s) + SplitUtils.fileCount(s) *
openCostInBytes).sum
+ val bytesPerCore = totalRawBytes / minPartitionNum
val maxSplitBytes = Math.min(defaultMaxSplitBytes,
Math.max(openCostInBytes, bytesPerCore))
logInfo(
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
index 6ee1ba38df..3b4efbde0d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
@@ -20,7 +20,6 @@ package org.apache.paimon.spark.scan
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
-import org.apache.paimon.spark.PaimonBatch
import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
import org.apache.paimon.table.{FileStoreTable, InnerTable}
import org.apache.paimon.table.source.{DataSplit, Split}
@@ -28,7 +27,7 @@ import org.apache.paimon.table.source.{DataSplit, Split}
import org.apache.spark.sql.PaimonUtils
import org.apache.spark.sql.connector.expressions.{Expressions, NamedReference}
import org.apache.spark.sql.connector.expressions.filter.{Predicate =>
SparkPredicate}
-import org.apache.spark.sql.connector.read.{Batch, SupportsRuntimeV2Filtering}
+import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering
import org.apache.spark.sql.sources.{Filter, In}
import org.apache.spark.sql.types.StructType
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
index 4ffd9ae64e..bb4fa377cd 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
@@ -68,7 +68,8 @@ case class PaimonStatistics(
}
}
- lazy val readRowSizeRatio: Double = estimateRowSize(readRowType) /
estimateRowSize(tableRowType)
+ lazy val readRowSizeRatio: Double =
+ estimateRowSize(readRowType).toDouble / estimateRowSize(tableRowType)
private def estimateRowSize(rowType: RowType): Long = {
rowType.getFields.asScala.map(estimateFieldSize).sum
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
index acbb378586..2e6014ff1a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/OptionUtils.scala
@@ -111,6 +111,10 @@ object OptionUtils extends SQLConfHelper with Logging {
getOptionString(SparkConnectorOptions.READ_ALLOW_FULL_SCAN).toBoolean
}
+ def sourceSplitTargetSizeWithColumnPruning(): Boolean = {
+
getOptionString(SparkConnectorOptions.SOURCE_SPLIT_TARGET_SIZE_WITH_COLUMN_PRUNING).toBoolean
+ }
+
private def mergeSQLConf(extraOptions: JMap[String, String]): JMap[String,
String] = {
val mergedOptions = new JHashMap[String, String](
conf.getAllConfs
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
index e2c14845c6..038f3ae307 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
@@ -21,24 +21,43 @@ package org.apache.paimon.spark.util
import org.apache.paimon.table.format.FormatDataSplit
import org.apache.paimon.table.source.{DataSplit, Split}
+import java.util.{Collections => JCollections}
+
import scala.collection.JavaConverters._
object SplitUtils {
def splitSize(split: Split): Long = {
split match {
- case ds: DataSplit => ds.dataFiles().asScala.map(_.fileSize).sum
+ case ds: DataSplit =>
+ ds.dataFiles().asScala.map(_.fileSize).sum
case fs: FormatDataSplit =>
if (fs.length() == null) fs.fileSize() else fs.length().longValue()
case _ => 0
}
}
- def fileCount(split: Split): Long = {
+ def fileCount(split: Split): Long = dataFileCount(split) +
deleteFileCount(split)
+
+ def dataFileCount(split: Split): Long = {
split match {
case ds: DataSplit => ds.dataFiles().size()
case _: FormatDataSplit => 1
case _ => 0
}
}
+
+ def deleteFileCount(split: Split): Long = {
+ split match {
+ case ds: DataSplit =>
+ ds.deletionFiles()
+ .orElse(JCollections.emptyList())
+ .asScala
+ .filter(_ != null)
+ .map(_.path())
+ .distinct
+ .size
+ case _ => 0
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
index 72941b8c0d..994dd2cb09 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
@@ -145,4 +145,35 @@ class BinPackingSplitsTest extends PaimonSparkTestBase {
}
}
}
+
+ test("Paimon: get read splits with column pruning") {
+ withTable("t") {
+ sql(
+ "CREATE TABLE t (a INT, s1 STRING, s2 STRING, s3 STRING, s4 STRING, s5
STRING, s6 STRING, s7 STRING, s8 STRING, s9 STRING)")
+ sql(
+ "INSERT INTO t SELECT /*+ REPARTITION(10) */ id, uuid(), uuid(),
uuid(), uuid(), uuid(), uuid(), uuid(), uuid(), uuid() FROM range(1000000)")
+
+ withSparkSQLConf(
+ "spark.sql.files.minPartitionNum" -> "1",
+ "spark.sql.files.openCostInBytes" -> "0",
+ "spark.paimon.source.split.target-size" -> "32m") {
+ for (splitSizeWithColumnPruning <- Seq("true", "false")) {
+ withSparkSQLConf(
+ "spark.paimon.source.split.target-size-with-column-pruning" ->
splitSizeWithColumnPruning) {
+ // Select one col
+ var partitionCount = getPaimonScan("SELECT s1 FROM
t").inputPartitions.length
+ if (splitSizeWithColumnPruning.toBoolean) {
+ assert(partitionCount == 1)
+ } else {
+ assert(partitionCount > 1)
+ }
+
+ // Select all cols
+ partitionCount = getPaimonScan("SELECT * FROM
t").inputPartitions.length
+ assert(partitionCount > 1)
+ }
+ }
+ }
+ }
+ }
}