This is an automated email from the ASF dual-hosted git repository.
zouxxyy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b8c3932a48 [spark] Refactor BaseScan and PaimonStatistics (#6833)
b8c3932a48 is described below
commit b8c3932a483dc43660f4211c83cbde4ced956b06
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Dec 19 10:12:45 2025 +0800
[spark] Refactor BaseScan and PaimonStatistics (#6833)
---
.../paimon/spark/PaimonFormatTableScan.scala | 6 +-
.../scala/org/apache/paimon/spark/PaimonScan.scala | 3 +-
.../paimon/spark/statistics/StatisticsHelper.scala | 30 ------
.../scala/org/apache/paimon/spark/PaimonScan.scala | 8 +-
.../paimon/spark/statistics/StatisticsHelper.scala | 30 ------
.../paimon/spark/FormatTableStatistics.scala | 57 -----------
.../org/apache/paimon/spark/PaimonBaseScan.scala | 66 +++++--------
.../paimon/spark/PaimonFormatTableBaseScan.scala | 53 +++-------
.../paimon/spark/PaimonPartitionReader.scala | 12 +--
.../scala/org/apache/paimon/spark/PaimonScan.scala | 27 ++----
.../apache/paimon/spark/PaimonScanBuilder.scala | 1 +
.../spark/PaimonSparkCopyOnWriteOperation.scala | 1 +
.../apache/paimon/spark/PaimonSparkTableBase.scala | 1 +
.../DisableUnnecessaryPaimonBucketedScan.scala | 2 +-
.../BaseScan.scala} | 58 ++++++++---
.../BinPackingSplits.scala} | 13 ++-
.../spark/{ => scan}/PaimonCopyOnWriteScan.scala | 41 +++-----
.../paimon/spark/{ => scan}/PaimonLocalScan.scala | 2 +-
.../paimon/spark/{ => scan}/PaimonSplitScan.scala | 31 +-----
.../paimon/spark/{ => scan}/PaimonStatistics.scala | 87 ++++++++++-------
.../spark/statistics/StatisticsHelperBase.scala | 108 ---------------------
.../SplitUtils.scala} | 30 ++++--
.../paimon/spark/write/BaseV2WriteBuilder.scala | 2 +-
.../apache/paimon/spark/write/PaimonV2Write.scala | 4 +-
...HelperTest.scala => BinPackingSplitsTest.scala} | 22 ++---
.../paimon/spark/sql/DeletionVectorTest.scala | 3 +-
.../apache/paimon/spark/sql/PaimonMetricTest.scala | 7 +-
.../paimon/spark/sql/PaimonPushDownTestBase.scala | 24 ++---
.../spark/sql/SparkV2FilterConverterTestBase.scala | 2 +-
.../paimon/spark/util/ScanPlanHelperTest.scala | 2 +-
30 files changed, 235 insertions(+), 498 deletions(-)
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
index e9734d238b..467172401b 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonFormatTableScan.scala
@@ -38,8 +38,7 @@ case class PaimonFormatTableScan(
pushedDataFilters: Seq[Predicate],
override val pushedLimit: Option[Int] = None)
extends PaimonFormatTableBaseScan
- with SupportsRuntimeFiltering
- with ScanHelper {
+ with SupportsRuntimeFiltering {
override def filterAttributes(): Array[NamedReference] = {
val requiredFields = readBuilder.readType().getFieldNames.asScala
@@ -61,7 +60,8 @@ case class PaimonFormatTableScan(
if (partitionFilter.nonEmpty) {
readBuilder.withFilter(partitionFilter.head)
// set inputPartitions null to trigger to get the new splits.
- inputPartitions = null
+ _inputSplits = null
+ _inputPartitions = null
}
}
}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 68b89d5ecc..e9eaa7d6cc 100644
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -61,7 +61,8 @@ case class PaimonScan(
if (partitionFilter.nonEmpty) {
readBuilder.withFilter(partitionFilter.head)
// set inputPartitions null to trigger to get the new splits.
- inputPartitions = null
+ _inputSplits = null
+ _inputPartitions = null
}
}
}
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
deleted file mode 100644
index e64785ddee..0000000000
---
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.statistics
-
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.connector.read.Statistics
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-
-trait StatisticsHelper extends StatisticsHelperBase {
- protected def toV1Stats(v2Stats: Statistics, attrs: Seq[Attribute]):
logical.Statistics = {
- DataSourceV2Relation.transformV2Stats(v2Stats, None,
conf.defaultSizeInBytes)
- }
-}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 7c0a4d0c17..3afca15303 100644
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -81,7 +81,7 @@ case class PaimonScan(
/** Extract the bucket number from the splits only if all splits have the
same totalBuckets number. */
private def extractBucketNumber(): Option[Int] = {
- val splits = getOriginSplits
+ val splits = inputSplits
if (splits.exists(!_.isInstanceOf[DataSplit])) {
None
} else {
@@ -102,7 +102,7 @@ case class PaimonScan(
// Since Spark 3.3
override def outputPartitioning: Partitioning = {
extractBucketTransform
- .map(bucket => new KeyGroupedPartitioning(Array(bucket),
lazyInputPartitions.size))
+ .map(bucket => new KeyGroupedPartitioning(Array(bucket),
inputPartitions.size))
.getOrElse(new UnknownPartitioning(0))
}
@@ -142,8 +142,8 @@ case class PaimonScan(
if (partitionFilter.nonEmpty) {
readBuilder.withFilter(partitionFilter.head)
// set inputPartitions null to trigger to get the new splits.
- inputPartitions = null
- inputSplits = null
+ _inputPartitions = null
+ _inputSplits = null
}
}
}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
b/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
deleted file mode 100644
index e64785ddee..0000000000
---
a/paimon-spark/paimon-spark-3.3/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.statistics
-
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.connector.read.Statistics
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
-
-trait StatisticsHelper extends StatisticsHelperBase {
- protected def toV1Stats(v2Stats: Statistics, attrs: Seq[Attribute]):
logical.Statistics = {
- DataSourceV2Relation.transformV2Stats(v2Stats, None,
conf.defaultSizeInBytes)
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
deleted file mode 100644
index 8863a259ef..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/FormatTableStatistics.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark
-
-import org.apache.paimon.table.format.FormatDataSplit
-import org.apache.paimon.types.RowType
-
-import org.apache.spark.sql.connector.read.Statistics
-
-import java.util.OptionalLong
-
-import scala.collection.JavaConverters._
-
-case class FormatTableStatistics[T <: PaimonFormatTableBaseScan](scan: T)
extends Statistics {
-
- private lazy val fileTotalSize: Long =
- scan.getOriginSplits
- .map(_.asInstanceOf[FormatDataSplit])
- .map(
- split => {
- if (split.length() != null) {
- split.length().longValue()
- } else {
- split.fileSize()
- }
- })
- .sum
-
- override def sizeInBytes(): OptionalLong = {
- val size = fileTotalSize /
- estimateRowSize(scan.tableRowType) *
- estimateRowSize(scan.readTableRowType)
- OptionalLong.of(size)
- }
-
- private def estimateRowSize(rowType: RowType): Long = {
- rowType.getFields.asScala.map(_.`type`().defaultSize().toLong).sum
- }
-
- override def numRows(): OptionalLong = OptionalLong.empty()
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
index c0b6717269..7976ea6f63 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScan.scala
@@ -20,44 +20,31 @@ package org.apache.paimon.spark
import org.apache.paimon.annotation.VisibleForTesting
import org.apache.paimon.spark.metric.SparkMetricRegistry
+import org.apache.paimon.spark.scan.BaseScan
import org.apache.paimon.spark.sources.PaimonMicroBatchStream
import org.apache.paimon.spark.util.OptionUtils
-import org.apache.paimon.stats
import org.apache.paimon.table.{DataTable, FileStoreTable, InnerTable}
import org.apache.paimon.table.source.{InnerTableScan, Split}
+import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
-import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics,
SupportsReportStatistics}
+import org.apache.spark.sql.connector.read.Batch
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream
-import org.apache.spark.sql.types.StructType
-
-import java.util.Optional
import scala.collection.JavaConverters._
-abstract class PaimonBaseScan(table: InnerTable)
- extends Scan
- with SupportsReportStatistics
- with ScanHelper
- with ColumnPruningAndPushDown {
-
- protected var inputPartitions: Seq[PaimonInputPartition] = _
-
- protected var inputSplits: Array[Split] = _
-
- lazy val statistics: Optional[stats.Statistics] = table.statistics()
+abstract class PaimonBaseScan(table: InnerTable) extends BaseScan with
SQLConfHelper {
private lazy val paimonMetricsRegistry: SparkMetricRegistry =
SparkMetricRegistry()
- lazy val requiredStatsSchema: StructType = {
- val fieldNames = readTableRowType.getFields.asScala.map(_.name)
- StructType(tableSchema.filter(field => fieldNames.contains(field.name)))
- }
+ // May recalculate the splits after executing runtime filter push down.
+ protected var _inputSplits: Array[Split] = _
+ protected var _inputPartitions: Seq[PaimonInputPartition] = _
@VisibleForTesting
- def getOriginSplits: Array[Split] = {
- if (inputSplits == null) {
- inputSplits = readBuilder
+ def inputSplits: Array[Split] = {
+ if (_inputSplits == null) {
+ _inputSplits = readBuilder
.newScan()
.asInstanceOf[InnerTableScan]
.withMetricRegistry(paimonMetricsRegistry)
@@ -66,40 +53,33 @@ abstract class PaimonBaseScan(table: InnerTable)
.asScala
.toArray
}
- inputSplits
+ _inputSplits
}
- final def lazyInputPartitions: Seq[PaimonInputPartition] = {
- if (inputPartitions == null) {
- inputPartitions = getInputPartitions(getOriginSplits)
+ final override def inputPartitions: Seq[PaimonInputPartition] = {
+ if (_inputPartitions == null) {
+ _inputPartitions = getInputPartitions(inputSplits)
}
- inputPartitions
+ _inputPartitions
}
override def toBatch: Batch = {
ensureNoFullScan()
- PaimonBatch(lazyInputPartitions, readBuilder,
coreOptions.blobAsDescriptor(), metadataColumns)
+ super.toBatch
}
override def toMicroBatchStream(checkpointLocation: String):
MicroBatchStream = {
new PaimonMicroBatchStream(table.asInstanceOf[DataTable], readBuilder,
checkpointLocation)
}
- override def estimateStatistics(): Statistics = {
- PaimonStatistics(this)
- }
-
override def supportedCustomMetrics: Array[CustomMetric] = {
- Array(
- PaimonNumSplitMetric(),
- PaimonPartitionSizeMetric(),
- PaimonReadBatchTimeMetric(),
- PaimonPlanningDurationMetric(),
- PaimonScannedSnapshotIdMetric(),
- PaimonScannedManifestsMetric(),
- PaimonSkippedTableFilesMetric(),
- PaimonResultedTableFilesMetric()
- )
+ super.supportedCustomMetrics ++
+ Array(
+ PaimonPlanningDurationMetric(),
+ PaimonScannedSnapshotIdMetric(),
+ PaimonScannedManifestsMetric(),
+ PaimonSkippedTableFilesMetric()
+ )
}
override def reportDriverMetrics(): Array[CustomTaskMetric] = {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
index 0d84664639..707aee0948 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonFormatTableBaseScan.scala
@@ -18,63 +18,34 @@
package org.apache.paimon.spark
+import org.apache.paimon.spark.scan.BaseScan
import org.apache.paimon.table.FormatTable
import org.apache.paimon.table.source.Split
-import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
-import org.apache.spark.sql.connector.read.{Batch, Statistics,
SupportsReportStatistics}
-
import scala.collection.JavaConverters._
/** Base Scan implementation for [[FormatTable]]. */
-abstract class PaimonFormatTableBaseScan
- extends ColumnPruningAndPushDown
- with SupportsReportStatistics
- with ScanHelper {
+abstract class PaimonFormatTableBaseScan extends BaseScan {
- protected var inputSplits: Array[Split] = _
- protected var inputPartitions: Seq[PaimonInputPartition] = _
+ protected var _inputSplits: Array[Split] = _
+ protected var _inputPartitions: Seq[PaimonInputPartition] = _
- def getOriginSplits: Array[Split] = {
- if (inputSplits == null) {
- inputSplits = readBuilder
+ def inputSplits: Array[Split] = {
+ if (_inputSplits == null) {
+ _inputSplits = readBuilder
.newScan()
.plan()
.splits()
.asScala
.toArray
}
- inputSplits
+ _inputSplits
}
- final def lazyInputPartitions: Seq[PaimonInputPartition] = {
- if (inputPartitions == null) {
- inputPartitions = getInputPartitions(getOriginSplits)
+ final override def inputPartitions: Seq[PaimonInputPartition] = {
+ if (_inputPartitions == null) {
+ _inputPartitions = getInputPartitions(inputSplits)
}
- inputPartitions
- }
-
- override def toBatch: Batch = {
- PaimonBatch(lazyInputPartitions, readBuilder,
coreOptions.blobAsDescriptor(), metadataColumns)
- }
-
- override def estimateStatistics(): Statistics = {
- FormatTableStatistics(this)
- }
-
- override def supportedCustomMetrics: Array[CustomMetric] = {
- Array(
- PaimonNumSplitMetric(),
- PaimonPartitionSizeMetric(),
- PaimonReadBatchTimeMetric(),
- PaimonResultedTableFilesMetric()
- )
- }
-
- override def reportDriverMetrics(): Array[CustomTaskMetric] = {
- val filesCount = getOriginSplits.length
- Array(
- PaimonResultedTableFilesTaskMetric(filesCount)
- )
+ _inputPartitions
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
index 2040982687..fd8a6178c4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionReader.scala
@@ -23,8 +23,8 @@ import org.apache.paimon.disk.IOManager
import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.spark.data.SparkInternalRow
import org.apache.paimon.spark.schema.PaimonMetadataColumn
-import org.apache.paimon.table.format.FormatDataSplit
-import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
+import org.apache.paimon.spark.util.SplitUtils
+import org.apache.paimon.table.source.{ReadBuilder, Split}
import org.apache.paimon.types.RowType
import org.apache.spark.sql.catalyst.InternalRow
@@ -114,13 +114,7 @@ case class PaimonPartitionReader(
// Partition metrics need to be computed only once.
private lazy val partitionMetrics: Array[CustomTaskMetric] = {
val numSplits = partition.splits.length
- val splitSize = partition.splits.map {
- case ds: DataSplit => ds.dataFiles().asScala.map(_.fileSize).sum
- case fs: FormatDataSplit =>
- if (fs.length() == null) fs.fileSize() else fs.length().longValue()
- case _ => 0
- }.sum
-
+ val splitSize = partition.splits.map(SplitUtils.splitSize).sum
Array(
PaimonNumSplitsTaskMetric(numSplits),
PaimonPartitionSizeTaskMetric(splitSize)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index f8a6e89df8..06a97ee8b4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -42,8 +42,11 @@ case class PaimonScan(
override val pushedLimit: Option[Int],
override val pushedTopN: Option[TopN],
bucketedScanDisabled: Boolean = false)
- extends PaimonScanCommon(table, requiredSchema, bucketedScanDisabled)
+ extends PaimonBaseScan(table)
+ with SupportsReportPartitioning
+ with SupportsReportOrdering
with SupportsRuntimeV2Filtering {
+
def disableBucketedScan(): PaimonScan = {
copy(bucketedScanDisabled = true)
}
@@ -71,19 +74,10 @@ case class PaimonScan(
if (partitionFilter.nonEmpty) {
readBuilder.withFilter(partitionFilter.toList.asJava)
// set inputPartitions null to trigger to get the new splits.
- inputPartitions = null
- inputSplits = null
+ _inputPartitions = null
+ _inputSplits = null
}
}
-}
-
-abstract class PaimonScanCommon(
- table: InnerTable,
- requiredSchema: StructType,
- bucketedScanDisabled: Boolean = false)
- extends PaimonBaseScan(table)
- with SupportsReportPartitioning
- with SupportsReportOrdering {
@transient
private lazy val extractBucketTransform: Option[Transform] = {
@@ -122,7 +116,7 @@ abstract class PaimonScanCommon(
/** Extract the bucket number from the splits only if all splits have the
same totalBuckets number. */
private def extractBucketNumber(): Option[Int] = {
- val splits = getOriginSplits
+ val splits = inputSplits
if (splits.exists(!_.isInstanceOf[DataSplit])) {
None
} else {
@@ -143,15 +137,14 @@ abstract class PaimonScanCommon(
// Since Spark 3.3
override def outputPartitioning: Partitioning = {
extractBucketTransform
- .map(bucket => new KeyGroupedPartitioning(Array(bucket),
lazyInputPartitions.size))
+ .map(bucket => new KeyGroupedPartitioning(Array(bucket),
inputPartitions.size))
.getOrElse(new UnknownPartitioning(0))
}
// Since Spark 3.4
override def outputOrdering(): Array[SortOrder] = {
if (
- !shouldDoBucketedScan || lazyInputPartitions.exists(
- !_.isInstanceOf[PaimonBucketedInputPartition])
+ !shouldDoBucketedScan ||
inputPartitions.exists(!_.isInstanceOf[PaimonBucketedInputPartition])
) {
return Array.empty
}
@@ -164,7 +157,7 @@ abstract class PaimonScanCommon(
return Array.empty
}
- val allSplitsKeepOrdering = lazyInputPartitions.toSeq
+ val allSplitsKeepOrdering = inputPartitions.toSeq
.map(_.asInstanceOf[PaimonBucketedInputPartition])
.map(_.splits.asInstanceOf[Seq[DataSplit]])
.forall {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index c61ce42b80..6eeaaf7b93 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate._
import org.apache.paimon.predicate.SortValue.{NullOrdering, SortDirection}
import
org.apache.paimon.spark.aggregate.AggregatePushDownUtils.tryPushdownAggregation
+import org.apache.paimon.spark.scan.PaimonLocalScan
import org.apache.paimon.table.{FileStoreTable, InnerTable}
import org.apache.spark.sql.connector.expressions
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
index 96256468e9..b939ba8ef5 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkCopyOnWriteOperation.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.BucketFunctionType
import org.apache.paimon.options.Options
import org.apache.paimon.spark.catalog.functions.BucketFunction
+import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.spark.write.PaimonV2WriteBuilder
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
index 22758dea0f..867de1109c 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSparkTableBase.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.BucketFunctionType
import org.apache.paimon.options.Options
import org.apache.paimon.spark.catalog.functions.BucketFunction
+import org.apache.paimon.spark.scan.PaimonSplitScanBuilder
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.util.OptionUtils
import org.apache.paimon.spark.write.{PaimonV2WriteBuilder, PaimonWriteBuilder}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/adaptive/DisableUnnecessaryPaimonBucketedScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/adaptive/DisableUnnecessaryPaimonBucketedScan.scala
index db3c8fc692..b0101ded21 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/adaptive/DisableUnnecessaryPaimonBucketedScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/adaptive/DisableUnnecessaryPaimonBucketedScan.scala
@@ -153,7 +153,7 @@ object DisableUnnecessaryPaimonBucketedScan extends
Rule[SparkPlan] {
plan match {
case batch: BatchScanExec =>
batch.scan match {
- case scan: PaimonScan if scan.lazyInputPartitions.forall(_.bucketed)
=>
+ case scan: PaimonScan if scan.inputPartitions.forall(_.bucketed) =>
Some((batch, scan))
case _ => None
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
similarity index 71%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
index 98daf2eaef..ffacda9853 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ColumnPruningAndPushDown.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BaseScan.scala
@@ -16,24 +16,28 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
import org.apache.paimon.CoreOptions
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.{Predicate, TopN}
+import org.apache.paimon.spark.{PaimonBatch, PaimonInputPartition,
PaimonNumSplitMetric, PaimonPartitionSizeMetric, PaimonReadBatchTimeMetric,
PaimonResultedTableFilesMetric, PaimonResultedTableFilesTaskMetric,
SparkTypeUtils}
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
+import org.apache.paimon.spark.util.SplitUtils
import org.apache.paimon.table.{SpecialFields, Table}
-import org.apache.paimon.table.source.ReadBuilder
+import org.apache.paimon.table.source.{ReadBuilder, Split}
import org.apache.paimon.types.RowType
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.connector.read.Scan
+import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
+import org.apache.spark.sql.connector.read.{Batch, Scan, Statistics,
SupportsReportStatistics}
import org.apache.spark.sql.types.StructType
import scala.collection.JavaConverters._
-trait ColumnPruningAndPushDown extends Scan with Logging {
+/** Base scan. */
+trait BaseScan extends Scan with SupportsReportStatistics with Logging {
def table: Table
@@ -46,6 +50,12 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
def pushedLimit: Option[Int] = None
def pushedTopN: Option[TopN] = None
+ // Input splits
+ def inputSplits: Array[Split]
+ def inputPartitions: Seq[PaimonInputPartition] =
getInputPartitions(inputSplits)
+ def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] =
+ BinPackingSplits(coreOptions).pack(splits)
+
val coreOptions: CoreOptions = CoreOptions.fromMap(table.options())
lazy val tableRowType: RowType = {
@@ -56,12 +66,6 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
}
}
- lazy val tableSchema: StructType =
SparkTypeUtils.fromPaimonRowType(tableRowType)
-
- final def partitionType: StructType = {
- SparkTypeUtils.toSparkPartitionType(table)
- }
-
private[paimon] val (readTableRowType, metadataFields) = {
requiredSchema.fields.foreach(f => checkMetadataColumn(f.name))
val (_requiredTableFields, _metadataFields) =
@@ -102,10 +106,6 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
_readBuilder.dropStats()
}
- final def metadataColumns: Seq[PaimonMetadataColumn] = {
- metadataFields.map(field => PaimonMetadataColumn.get(field.name,
partitionType))
- }
-
override def readSchema(): StructType = {
val _readSchema = StructType(
SparkTypeUtils.fromPaimonRowType(readTableRowType).fields ++
metadataFields)
@@ -116,6 +116,36 @@ trait ColumnPruningAndPushDown extends Scan with Logging {
_readSchema
}
+ override def toBatch: Batch = {
+ val metadataColumns = metadataFields.map(
+ field => PaimonMetadataColumn.get(field.name,
SparkTypeUtils.toSparkPartitionType(table)))
+ PaimonBatch(inputPartitions, readBuilder, coreOptions.blobAsDescriptor(),
metadataColumns)
+ }
+
+ def estimateStatistics: Statistics = {
+ PaimonStatistics(
+ inputSplits,
+ SparkTypeUtils.toPaimonRowType(readSchema()),
+ table.rowType(),
+ table.statistics())
+ }
+
+ override def supportedCustomMetrics: Array[CustomMetric] = {
+ Array(
+ PaimonNumSplitMetric(),
+ PaimonPartitionSizeMetric(),
+ PaimonReadBatchTimeMetric(),
+ PaimonResultedTableFilesMetric()
+ )
+ }
+
+ override def reportDriverMetrics(): Array[CustomTaskMetric] = {
+ val filesCount = inputSplits.map(SplitUtils.fileCount).sum
+ Array(
+ PaimonResultedTableFilesTaskMetric(filesCount)
+ )
+ }
+
override def description(): String = {
val pushedPartitionFiltersStr = if (pushedPartitionFilters.nonEmpty) {
", PartitionFilters: [" + pushedPartitionFilters.mkString(",") + "]"
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
similarity index 94%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
index b14abebd64..f22bce3ec1 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/ScanHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
@@ -16,11 +16,12 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions._
import org.apache.paimon.io.DataFileMeta
+import org.apache.paimon.spark.PaimonInputPartition
import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackDataSplit
import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
@@ -32,12 +33,10 @@ import org.apache.spark.sql.internal.SQLConf
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-trait ScanHelper extends SQLConfHelper with Logging {
+case class BinPackingSplits(coreOptions: CoreOptions) extends SQLConfHelper
with Logging {
private val spark = PaimonSparkSession.active
- val coreOptions: CoreOptions
-
private lazy val deletionVectors: Boolean =
coreOptions.deletionVectorsEnabled()
private lazy val filesMaxPartitionBytes: Long = {
@@ -72,7 +71,7 @@ trait ScanHelper extends SQLConfHelper with Logging {
.getOrElse(spark.sparkContext.defaultParallelism)
}
- def getInputPartitions(splits: Array[Split]): Seq[PaimonInputPartition] = {
+ def pack(splits: Array[Split]): Seq[PaimonInputPartition] = {
val (toReshuffle, reserved) = splits.partition {
case _: FallbackDataSplit => false
case split: DataSplit => split.beforeFiles().isEmpty &&
split.rawConvertible()
@@ -80,7 +79,7 @@ trait ScanHelper extends SQLConfHelper with Logging {
}
if (toReshuffle.nonEmpty) {
val startTS = System.currentTimeMillis()
- val reshuffled = getInputPartitions(toReshuffle.collect { case ds:
DataSplit => ds })
+ val reshuffled = packDataSplit(toReshuffle.collect { case ds: DataSplit
=> ds })
val all = reserved.map(PaimonInputPartition.apply) ++ reshuffled
val duration = System.currentTimeMillis() - startTS
logInfo(
@@ -92,7 +91,7 @@ trait ScanHelper extends SQLConfHelper with Logging {
}
}
- private def getInputPartitions(splits: Array[DataSplit]):
Array[PaimonInputPartition] = {
+ private def packDataSplit(splits: Array[DataSplit]):
Array[PaimonInputPartition] = {
val maxSplitBytes = computeMaxSplitBytes(splits)
var currentSize = 0L
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
similarity index 77%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
index 7e45f4be1b..6ee1ba38df 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonCopyOnWriteScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonCopyOnWriteScan.scala
@@ -16,10 +16,11 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
import org.apache.paimon.partition.PartitionPredicate
-import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.predicate.{Predicate, PredicateBuilder}
+import org.apache.paimon.spark.PaimonBatch
import org.apache.paimon.spark.schema.PaimonMetadataColumn.FILE_PATH_COLUMN
import org.apache.paimon.table.{FileStoreTable, InnerTable}
import org.apache.paimon.table.source.{DataSplit, Split}
@@ -40,21 +41,15 @@ case class PaimonCopyOnWriteScan(
table: InnerTable,
requiredSchema: StructType,
pushedPartitionFilters: Seq[PartitionPredicate],
- pushedDataFilters: Seq[Predicate],
- bucketedScanDisabled: Boolean = false)
- extends PaimonScanCommon(table, requiredSchema, bucketedScanDisabled)
+ pushedDataFilters: Seq[Predicate])
+ extends BaseScan
with SupportsRuntimeV2Filtering {
- var filteredLocations: mutable.Set[String] = mutable.Set[String]()
-
- var filteredFileNames: mutable.Set[String] = mutable.Set[String]()
+ override def inputSplits: Array[Split] =
dataSplits.asInstanceOf[Array[Split]]
+ private val filteredFileNames: mutable.Set[String] = mutable.Set[String]()
var dataSplits: Array[DataSplit] = Array()
- def disableBucketedScan(): PaimonCopyOnWriteScan = {
- copy(bucketedScanDisabled = true)
- }
-
override def filterAttributes(): Array[NamedReference] = {
Array(Expressions.column(FILE_PATH_COLUMN))
}
@@ -66,7 +61,6 @@ case class PaimonCopyOnWriteScan(
case in: In if in.attribute.equalsIgnoreCase(FILE_PATH_COLUMN) =>
for (value <- in.values) {
val location = value.asInstanceOf[String]
- filteredLocations.add(location)
filteredFileNames.add(Paths.get(location).getFileName.toString)
}
case _ => logWarning("Unsupported runtime filter")
@@ -79,26 +73,17 @@ case class PaimonCopyOnWriteScan(
if (fileStoreTable.coreOptions().manifestDeleteFileDropStats()) {
snapshotReader.dropStats()
}
-
- pushedPartitionFilters.foreach(snapshotReader.withPartitionFilter)
-
- pushedDataFilters.foreach(snapshotReader.withFilter)
-
+ if (pushedPartitionFilters.nonEmpty) {
+
snapshotReader.withPartitionFilter(PartitionPredicate.and(pushedPartitionFilters.asJava))
+ }
+ if (pushedDataFilters.nonEmpty) {
+
snapshotReader.withFilter(PredicateBuilder.and(pushedDataFilters.asJava))
+ }
snapshotReader.withDataFileNameFilter(fileName =>
filteredFileNames.contains(fileName))
-
dataSplits =
snapshotReader.read().splits().asScala.collect { case s: DataSplit
=> s }.toArray
case _ => throw new RuntimeException("Only FileStoreTable support.")
}
-
- }
-
- override def toBatch: Batch = {
- PaimonBatch(
- getInputPartitions(dataSplits.asInstanceOf[Array[Split]]),
- readBuilder,
- coreOptions.blobAsDescriptor(),
- metadataColumns)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonLocalScan.scala
similarity index 97%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonLocalScan.scala
index b4f8b3b785..7218e9e226 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonLocalScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonLocalScan.scala
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.table.Table
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonSplitScan.scala
similarity index 67%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonSplitScan.scala
index f5998ff73b..87185264f4 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonSplitScan.scala
@@ -16,14 +16,14 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
import org.apache.paimon.partition.PartitionPredicate
import org.apache.paimon.predicate.Predicate
+import org.apache.paimon.spark.{PaimonBaseScanBuilder, PaimonBatch}
import org.apache.paimon.table.{InnerTable, KnownSplitsTable}
import org.apache.paimon.table.source.{DataSplit, Split}
-import org.apache.spark.sql.connector.metric.{CustomMetric, CustomTaskMetric}
import org.apache.spark.sql.connector.read.{Batch, Scan}
import org.apache.spark.sql.types.StructType
@@ -46,30 +46,7 @@ case class PaimonSplitScan(
requiredSchema: StructType,
pushedPartitionFilters: Seq[PartitionPredicate],
pushedDataFilters: Seq[Predicate])
- extends ColumnPruningAndPushDown
- with ScanHelper {
+ extends BaseScan {
- override def toBatch: Batch = {
- PaimonBatch(
- getInputPartitions(dataSplits.asInstanceOf[Array[Split]]),
- readBuilder,
- coreOptions.blobAsDescriptor(),
- metadataColumns)
- }
-
- override def supportedCustomMetrics: Array[CustomMetric] = {
- Array(
- PaimonNumSplitMetric(),
- PaimonPartitionSizeMetric(),
- PaimonReadBatchTimeMetric(),
- PaimonResultedTableFilesMetric()
- )
- }
-
- override def reportDriverMetrics(): Array[CustomTaskMetric] = {
- val filesCount = dataSplits.map(_.dataFiles().size).sum
- Array(
- PaimonResultedTableFilesTaskMetric(filesCount)
- )
- }
+ override def inputSplits: Array[Split] =
dataSplits.asInstanceOf[Array[Split]]
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
similarity index 61%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
index cd8dcd1165..4ffd9ae64e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonStatistics.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/PaimonStatistics.scala
@@ -16,11 +16,14 @@
* limitations under the License.
*/
-package org.apache.paimon.spark
+package org.apache.paimon.spark.scan
+import org.apache.paimon.spark.DataConverter
+import org.apache.paimon.spark.util.SplitUtils
import org.apache.paimon.stats
import org.apache.paimon.stats.ColStats
-import org.apache.paimon.types.{DataField, DataType}
+import org.apache.paimon.table.source.Split
+import org.apache.paimon.types.{DataField, DataType, RowType}
import org.apache.spark.sql.PaimonUtils
import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
@@ -32,48 +35,64 @@ import java.util.{Optional, OptionalLong}
import scala.collection.JavaConverters._
-case class PaimonStatistics[T <: PaimonBaseScan](scan: T) extends Statistics {
-
- import PaimonImplicits._
+case class PaimonStatistics(
+ splits: Array[Split],
+ readRowType: RowType,
+ tableRowType: RowType,
+ paimonStats: Optional[stats.Statistics]
+) extends Statistics {
+
+ lazy val numRows: OptionalLong = {
+ if (splits.exists(_.rowCount() == -1)) {
+ OptionalLong.empty()
+ } else {
+ OptionalLong.of(splits.map(_.rowCount()).sum)
+ }
+ }
- private lazy val paimonStats: Option[stats.Statistics] = scan.statistics
+ lazy val sizeInBytes: OptionalLong = {
+ if (numRows.isPresent) {
+ val sizeInBytes = numRows.getAsLong * estimateRowSize(readRowType)
+ // Avoid return 0 bytes if there are some valid rows.
+ // Avoid return too small size in bytes which may less than row count,
+ // note the compression ratio on disk is usually bigger than memory.
+ OptionalLong.of(Math.max(sizeInBytes, numRows.getAsLong))
+ } else {
+ val fileTotalSize = splits.map(SplitUtils.splitSize).sum
+ if (fileTotalSize == 0) {
+ OptionalLong.empty()
+ } else {
+ val size = (fileTotalSize * readRowSizeRatio).toLong
+ OptionalLong.of(size)
+ }
+ }
+ }
- private lazy val rowCount: Long =
scan.lazyInputPartitions.map(_.rowCount()).sum
+ lazy val readRowSizeRatio: Double = estimateRowSize(readRowType) /
estimateRowSize(tableRowType)
- private lazy val scannedTotalSize: Long = {
- val readSchemaSize =
-
SparkTypeUtils.toPaimonRowType(scan.readSchema()).getFields.asScala.map(getSizeForField).sum
- val sizeInBytes = rowCount * readSchemaSize
- // Avoid return 0 bytes if there are some valid rows.
- // Avoid return too small size in bytes which may less than row count,
- // note the compression ratio on disk is usually bigger than memory.
- Math.max(sizeInBytes, rowCount)
+ private def estimateRowSize(rowType: RowType): Long = {
+ rowType.getFields.asScala.map(estimateFieldSize).sum
}
- private def getSizeForField(field: DataField): Long = {
- paimonStats match {
- case Some(stats) =>
- val colStat = stats.colStats().get(field.name())
- if (colStat != null && colStat.avgLen().isPresent) {
- colStat.avgLen().getAsLong
- } else {
- field.`type`().defaultSize().toLong
- }
- case _ =>
+ private def estimateFieldSize(field: DataField): Long = {
+ if (paimonStats.isPresent) {
+ val colStat = paimonStats.get.colStats().get(field.name())
+ if (colStat != null && colStat.avgLen().isPresent) {
+ colStat.avgLen().getAsLong
+ } else {
field.`type`().defaultSize().toLong
+ }
+ } else {
+ field.`type`().defaultSize().toLong
}
}
- override def numRows(): OptionalLong = OptionalLong.of(rowCount)
-
- override def sizeInBytes(): OptionalLong = OptionalLong.of(scannedTotalSize)
-
- override def columnStats(): java.util.Map[NamedReference, ColumnStatistics]
= {
- val requiredFields = scan.requiredStatsSchema.fieldNames
+ override lazy val columnStats: java.util.Map[NamedReference,
ColumnStatistics] = {
+ val requiredFields = readRowType.getFieldNames.asScala
val resultMap = new java.util.HashMap[NamedReference, ColumnStatistics]()
- if (paimonStats.isDefined) {
+ if (paimonStats.isPresent) {
val paimonColStats = paimonStats.get.colStats()
- scan.tableRowType.getFields.asScala
+ tableRowType.getFields.asScala
.filter {
field => requiredFields.contains(field.name) &&
paimonColStats.containsKey(field.name())
}
@@ -113,7 +132,7 @@ object PaimonColumnStats {
}
def apply(v1ColStats: ColumnStat): PaimonColumnStats = {
- import PaimonImplicits._
+ import org.apache.paimon.spark.PaimonImplicits._
PaimonColumnStats(
if (v1ColStats.nullCount.isDefined)
OptionalLong.of(v1ColStats.nullCount.get.longValue)
else OptionalLong.empty(),
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
deleted file mode 100644
index 627c6a1688..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelperBase.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.statistics
-
-import org.apache.paimon.spark.PaimonColumnStats
-
-import org.apache.spark.sql.PaimonUtils
-import org.apache.spark.sql.catalyst.{SQLConfHelper, StructFilters}
-import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, BoundReference, Expression}
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import
org.apache.spark.sql.catalyst.plans.logical.statsEstimation.FilterEstimation
-import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.expressions.NamedReference
-import org.apache.spark.sql.connector.read.Statistics
-import org.apache.spark.sql.connector.read.colstats.ColumnStatistics
-import org.apache.spark.sql.sources.{And, Filter}
-import org.apache.spark.sql.types.StructType
-
-import java.util.OptionalLong
-
-trait StatisticsHelperBase extends SQLConfHelper {
-
- val requiredStatsSchema: StructType
-
- private lazy val replacedStatsSchema =
- CharVarcharUtils.replaceCharVarcharWithStringInSchema(requiredStatsSchema)
-
- def filterStatistics(v2Stats: Statistics, filters: Seq[Filter]): Statistics
= {
- val attrs: Seq[AttributeReference] =
- replacedStatsSchema.map(f => AttributeReference(f.name, f.dataType,
f.nullable, f.metadata)())
- val condition = filterToCondition(filters, attrs)
-
- if (condition.isDefined && v2Stats.numRows().isPresent) {
- val filteredStats = FilterEstimation(
- logical.Filter(condition.get, FakePlanWithStats(toV1Stats(v2Stats,
attrs)))).estimate.get
- toV2Stats(filteredStats)
- } else {
- v2Stats
- }
- }
-
- private def filterToCondition(filters: Seq[Filter], attrs: Seq[Attribute]):
Option[Expression] = {
- StructFilters.filterToExpression(filters.reduce(And), toRef).map {
- expression =>
- expression.transform {
- case ref: BoundReference =>
- attrs.find(_.name == replacedStatsSchema(ref.ordinal).name).get
- }
- }
- }
-
- private def toRef(attr: String): Option[BoundReference] = {
- val index = replacedStatsSchema.fieldIndex(attr)
- val field = replacedStatsSchema(index)
- Option.apply(BoundReference(index, field.dataType, field.nullable))
- }
-
- protected def toV1Stats(v2Stats: Statistics, attrs: Seq[Attribute]):
logical.Statistics
-
- private def toV2Stats(v1Stats: logical.Statistics): Statistics = {
- new Statistics() {
- override def sizeInBytes(): OptionalLong = if (v1Stats.sizeInBytes !=
null)
- OptionalLong.of(v1Stats.sizeInBytes.longValue)
- else OptionalLong.empty()
-
- override def numRows(): OptionalLong = if (v1Stats.rowCount.isDefined)
- OptionalLong.of(v1Stats.rowCount.get.longValue)
- else OptionalLong.empty()
-
- override def columnStats(): java.util.Map[NamedReference,
ColumnStatistics] = {
- val columnStatsMap = new java.util.HashMap[NamedReference,
ColumnStatistics]()
- v1Stats.attributeStats.foreach {
- case (attr, v1ColStats) =>
- columnStatsMap.put(
- PaimonUtils.fieldReference(attr.name),
- PaimonColumnStats(v1ColStats)
- )
- }
- columnStatsMap
- }
- }
- }
-}
-
-case class FakePlanWithStats(v1Stats: logical.Statistics) extends LogicalPlan {
- override def output: Seq[Attribute] = Seq.empty
- override def children: Seq[LogicalPlan] = Seq.empty
- override protected def withNewChildrenInternal(
- newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = throw new
UnsupportedOperationException
- override def stats: logical.Statistics = v1Stats
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
similarity index 55%
rename from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
rename to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
index d54ac05fb4..e2c14845c6 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/statistics/StatisticsHelper.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SplitUtils.scala
@@ -16,15 +16,29 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.statistics
+package org.apache.paimon.spark.util
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.connector.read.Statistics
-import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.paimon.table.format.FormatDataSplit
+import org.apache.paimon.table.source.{DataSplit, Split}
-trait StatisticsHelper extends StatisticsHelperBase {
- protected def toV1Stats(v2Stats: Statistics, attrs: Seq[Attribute]):
logical.Statistics = {
- DataSourceV2Relation.transformV2Stats(v2Stats, None,
conf.defaultSizeInBytes, attrs)
+import scala.collection.JavaConverters._
+
+object SplitUtils {
+
+ def splitSize(split: Split): Long = {
+ split match {
+ case ds: DataSplit => ds.dataFiles().asScala.map(_.fileSize).sum
+ case fs: FormatDataSplit =>
+ if (fs.length() == null) fs.fileSize() else fs.length().longValue()
+ case _ => 0
+ }
+ }
+
+ def fileCount(split: Split): Long = {
+ split match {
+ case ds: DataSplit => ds.dataFiles().size()
+ case _: FormatDataSplit => 1
+ case _ => 0
+ }
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
index 17c8e9800f..dda04245cf 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/BaseV2WriteBuilder.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.write
-import org.apache.paimon.spark.PaimonCopyOnWriteScan
+import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
import org.apache.paimon.table.Table
import org.apache.spark.sql.connector.write.{SupportsDynamicOverwrite,
SupportsOverwrite, WriteBuilder}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
index ba89e84e2e..5a46e208f5 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/write/PaimonV2Write.scala
@@ -26,6 +26,7 @@ import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.commands.{SchemaHelper, SparkDataFileMeta}
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
import org.apache.paimon.spark.metric.SparkMetricRegistry
+import org.apache.paimon.spark.scan.PaimonCopyOnWriteScan
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageImpl, TableWriteImpl}
import org.apache.paimon.table.source.DataSplit
@@ -216,11 +217,8 @@ private case class CopyOnWriteBatchWrite(
batchTableCommit.truncateTable()
} else {
val touchedFiles = candidateFiles(scan.get.dataSplits)
-
val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
-
val addCommitMessages = WriteTaskResult.merge(messages)
-
val commitMessages = addCommitMessages ++ deletedCommitMessage
batchTableCommit.withMetricRegistry(metricRegistry)
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
similarity index 87%
rename from
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
rename to
paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
index cf0fdc91cd..72941b8c0d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/BinPackingSplitsTest.scala
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.io.DataFileMeta
import org.apache.paimon.manifest.FileSource
+import org.apache.paimon.spark.scan.BinPackingSplits
import org.apache.paimon.table.source.{DataSplit, Split}
import org.junit.jupiter.api.Assertions
@@ -31,7 +32,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConverters._
import scala.collection.mutable
-class ScanHelperTest extends PaimonSparkTestBase {
+class BinPackingSplitsTest extends PaimonSparkTestBase {
test("Paimon: reshuffle splits") {
withSparkSQLConf(("spark.sql.leafNodeDefaultParallelism", "20")) {
@@ -73,8 +74,8 @@ class ScanHelperTest extends PaimonSparkTestBase {
.build()
}
- val fakeScan = new FakeScan()
- val reshuffled = fakeScan.getInputPartitions(dataSplits.toArray)
+ val binPacking = BinPackingSplits(CoreOptions.fromMap(new JHashMap()))
+ val reshuffled = binPacking.pack(dataSplits.toArray)
Assertions.assertTrue(reshuffled.length > 5)
}
}
@@ -110,8 +111,8 @@ class ScanHelperTest extends PaimonSparkTestBase {
.build()
)
- val fakeScan = new FakeScan()
- val reshuffled = fakeScan.getInputPartitions(dataSplits)
+ val binPacking = BinPackingSplits(CoreOptions.fromMap(new JHashMap()))
+ val reshuffled = binPacking.pack(dataSplits)
Assertions.assertEquals(1, reshuffled.length)
}
@@ -126,13 +127,13 @@ class ScanHelperTest extends PaimonSparkTestBase {
// default openCostInBytes is 4m, so we will get 400 / 128 = 4 partitions
withSparkSQLConf("spark.sql.leafNodeDefaultParallelism" -> "1") {
- assert(paimonScan().lazyInputPartitions.length == 4)
+ assert(paimonScan().inputPartitions.length == 4)
}
withSparkSQLConf(
"spark.sql.files.openCostInBytes" -> "0",
"spark.sql.leafNodeDefaultParallelism" -> "1") {
- assert(paimonScan().lazyInputPartitions.length == 1)
+ assert(paimonScan().inputPartitions.length == 1)
}
// Paimon's conf takes precedence over Spark's
@@ -140,13 +141,8 @@ class ScanHelperTest extends PaimonSparkTestBase {
"spark.sql.files.openCostInBytes" -> "4194304",
"spark.paimon.source.split.open-file-cost" -> "0",
"spark.sql.leafNodeDefaultParallelism" -> "1") {
- assert(paimonScan().lazyInputPartitions.length == 1)
+ assert(paimonScan().inputPartitions.length == 1)
}
}
}
-
- class FakeScan extends ScanHelper {
- override val coreOptions: CoreOptions =
- CoreOptions.fromMap(new JHashMap[String, String]())
- }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index 2eb9aa253e..1dedb2cfca 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -21,7 +21,8 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.deletionvectors.{BucketedDvMaintainer,
BucketedDvMaintainerTest, DeletionVector}
import org.apache.paimon.fs.Path
-import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonSplitScan}
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.scan.PaimonSplitScan
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.FileStoreTable
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
index d776a00f93..b75ac07d7c 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonMetricTest.scala
@@ -18,8 +18,9 @@
package org.apache.paimon.spark.sql
-import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonSplitScan}
import org.apache.paimon.spark.PaimonMetrics.{RESULTED_TABLE_FILES,
SCANNED_SNAPSHOT_ID, SKIPPED_TABLE_FILES}
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.spark.scan.PaimonSplitScan
import org.apache.paimon.spark.util.ScanPlanHelper
import org.apache.paimon.table.source.DataSplit
@@ -52,7 +53,7 @@ class PaimonMetricTest extends PaimonSparkTestBase with
ScanPlanHelper {
resultedTableFiles: Long): Unit = {
val scan = getPaimonScan(s)
// call getInputPartitions to trigger scan
- scan.lazyInputPartitions
+ scan.inputPartitions
val metrics = scan.reportDriverMetrics()
Assertions.assertEquals(scannedSnapshotId, metric(metrics,
SCANNED_SNAPSHOT_ID))
Assertions.assertEquals(skippedTableFiles, metric(metrics,
SKIPPED_TABLE_FILES))
@@ -78,7 +79,7 @@ class PaimonMetricTest extends PaimonSparkTestBase with
ScanPlanHelper {
sql(s"INSERT INTO T VALUES (1, 'a'), (2, 'b')")
sql(s"INSERT INTO T VALUES (3, 'c')")
- val splits = getPaimonScan("SELECT * FROM
T").getOriginSplits.map(_.asInstanceOf[DataSplit])
+ val splits = getPaimonScan("SELECT * FROM
T").inputSplits.map(_.asInstanceOf[DataSplit])
val df = createDataset(spark, createNewScanPlan(splits,
createRelationV2("T")))
val scan = df.queryExecution.optimizedPlan
.collectFirst { case relation: DataSourceV2ScanRelation => relation }
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
index c23694e1d1..4ca4580aea 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTestBase.scala
@@ -215,12 +215,12 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
val scanBuilder = getScanBuilder()
Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
- val dataSplitsWithoutLimit =
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithoutLimit =
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
Assertions.assertTrue(dataSplitsWithoutLimit.length >= 2)
// It still returns false even it can push down limit.
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
- val dataSplitsWithLimit =
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithLimit =
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
Assertions.assertEquals(1, dataSplitsWithLimit.length)
Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
@@ -240,7 +240,7 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
// Case 1: All dataSplits is rawConvertible.
- val dataSplitsWithoutLimit =
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithoutLimit =
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
Assertions.assertEquals(4, dataSplitsWithoutLimit.length)
// All dataSplits is rawConvertible.
dataSplitsWithoutLimit.foreach(
@@ -250,19 +250,19 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
// It still returns false even it can push down limit.
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
- val dataSplitsWithLimit =
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithLimit =
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
Assertions.assertEquals(1, dataSplitsWithLimit.length)
Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
- val dataSplitsWithLimit1 =
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithLimit1 =
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
Assertions.assertEquals(2, dataSplitsWithLimit1.length)
Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())
// Case 2: Update 2 rawConvertible dataSplits to convert to
nonRawConvertible.
spark.sql("INSERT INTO T VALUES (1, 'a2', '11'), (2, 'b2', '22')")
val scanBuilder2 = getScanBuilder()
- val dataSplitsWithoutLimit2 =
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithoutLimit2 =
scanBuilder2.build().asInstanceOf[PaimonScan].inputSplits
Assertions.assertEquals(4, dataSplitsWithoutLimit2.length)
// Now, we have 4 dataSplits, and 2 dataSplit is nonRawConvertible, 2
dataSplit is rawConvertible.
Assertions.assertEquals(
@@ -271,13 +271,13 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
// Return 2 dataSplits.
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(2))
- val dataSplitsWithLimit2 =
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithLimit2 =
scanBuilder2.build().asInstanceOf[PaimonScan].inputSplits
Assertions.assertEquals(2, dataSplitsWithLimit2.length)
Assertions.assertEquals(2, spark.sql("SELECT * FROM T LIMIT 2").count())
// 2 dataSplits cannot meet the limit requirement, so need to scan all
dataSplits.
Assertions.assertFalse(scanBuilder2.asInstanceOf[SupportsPushDownLimit].pushLimit(3))
- val dataSplitsWithLimit22 =
scanBuilder2.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithLimit22 =
scanBuilder2.build().asInstanceOf[PaimonScan].inputSplits
// Need to scan all dataSplits.
Assertions.assertEquals(4, dataSplitsWithLimit22.length)
Assertions.assertEquals(3, spark.sql("SELECT * FROM T LIMIT 3").count())
@@ -285,7 +285,7 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
// Case 3: Update the remaining 2 rawConvertible dataSplits to make all
dataSplits is nonRawConvertible.
spark.sql("INSERT INTO T VALUES (3, 'c', '11'), (4, 'd', '22')")
val scanBuilder3 = getScanBuilder()
- val dataSplitsWithoutLimit3 =
scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithoutLimit3 =
scanBuilder3.build().asInstanceOf[PaimonScan].inputSplits
Assertions.assertEquals(4, dataSplitsWithoutLimit3.length)
// All dataSplits is nonRawConvertible.
@@ -295,7 +295,7 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
})
Assertions.assertFalse(scanBuilder3.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
- val dataSplitsWithLimit3 =
scanBuilder3.build().asInstanceOf[PaimonScan].getOriginSplits
+ val dataSplitsWithLimit3 =
scanBuilder3.build().asInstanceOf[PaimonScan].inputSplits
// Need to scan all dataSplits.
Assertions.assertEquals(4, dataSplitsWithLimit3.length)
Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
@@ -321,12 +321,12 @@ abstract class PaimonPushDownTestBase extends
PaimonSparkTestBase {
sql("DELETE FROM T WHERE id % 13 = 0")
Assertions.assertEquals(100, spark.sql("SELECT * FROM T LIMIT
100").count())
- val withoutLimit =
getScanBuilder().build().asInstanceOf[PaimonScan].getOriginSplits
+ val withoutLimit =
getScanBuilder().build().asInstanceOf[PaimonScan].inputSplits
assert(withoutLimit.length == 10)
val scanBuilder =
getScanBuilder().asInstanceOf[SupportsPushDownLimit]
scanBuilder.pushLimit(1)
- val withLimit =
scanBuilder.build().asInstanceOf[PaimonScan].getOriginSplits
+ val withLimit =
scanBuilder.build().asInstanceOf[PaimonScan].inputSplits
if (deletionVectorsEnabled || !primaryKeyTable) {
assert(withLimit.length == 1)
} else {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
index 08ceeceb3f..05704e3b36 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SparkV2FilterConverterTestBase.scala
@@ -364,7 +364,7 @@ abstract class SparkV2FilterConverterTestBase extends
PaimonSparkTestBase {
}
private def scanFilesCount(str: String, tableName: String = "test_tbl"): Int
= {
- getPaimonScan(s"SELECT * FROM $tableName WHERE $str").lazyInputPartitions
+ getPaimonScan(s"SELECT * FROM $tableName WHERE $str").inputPartitions
.flatMap(_.splits)
.map(_.asInstanceOf[DataSplit].dataFiles().size())
.sum
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
index df65a50095..fdb19684d4 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/util/ScanPlanHelperTest.scala
@@ -31,7 +31,7 @@ class ScanPlanHelperTest extends PaimonSparkTestBase with
ScanPlanHelper {
sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES
('row-tracking.enabled' = 'true')")
sql("INSERT INTO t VALUES (11, 'a'), (22, 'b')")
- val splits = getPaimonScan("SELECT * FROM
t").getOriginSplits.map(_.asInstanceOf[DataSplit])
+ val splits = getPaimonScan("SELECT * FROM
t").inputSplits.map(_.asInstanceOf[DataSplit])
val newScanPlan = createNewScanPlan(splits, createRelationV2("t"))
val newDf = createDataset(spark, newScanPlan)