This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e9da0ce9c1 [spark] Refactor PaimonScan hierarchy to support flexible
runtime filtering implementations (#6616)
e9da0ce9c1 is described below
commit e9da0ce9c1c0652afc27dbb1c917e149627d172a
Author: Kerwin Zhang <[email protected]>
AuthorDate: Mon Nov 17 17:04:51 2025 +0800
[spark] Refactor PaimonScan hierarchy to support flexible runtime filtering
implementations (#6616)
---
.../scala/org/apache/paimon/spark/PaimonScan.scala | 83 +++++++++++++---------
1 file changed, 50 insertions(+), 33 deletions(-)
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 8d8d1825ba..fe3efcea11 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -42,15 +42,60 @@ case class PaimonScan(
override val pushDownLimit: Option[Int],
override val pushDownTopN: Option[TopN],
bucketedScanDisabled: Boolean = false)
- extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters,
pushDownLimit)
- with SupportsRuntimeV2Filtering
- with SupportsReportPartitioning
- with SupportsReportOrdering {
-
+ extends PaimonScanCommon(
+ table,
+ requiredSchema,
+ filters,
+ reservedFilters,
+ pushDownLimit,
+ pushDownTopN,
+ bucketedScanDisabled)
+ with SupportsRuntimeV2Filtering {
def disableBucketedScan(): PaimonScan = {
copy(bucketedScanDisabled = true)
}
+ // Since Spark 3.2
+ override def filterAttributes(): Array[NamedReference] = {
+ val requiredFields = readBuilder.readType().getFieldNames.asScala
+ table
+ .partitionKeys()
+ .asScala
+ .toArray
+ .filter(requiredFields.contains)
+ .map(fieldReference)
+ }
+
+ override def filter(predicates: Array[SparkPredicate]): Unit = {
+ val converter = SparkV2FilterConverter(table.rowType())
+ val partitionKeys = table.partitionKeys().asScala.toSeq
+ val partitionFilter = predicates.flatMap {
+ case p
+ if
SparkV2FilterConverter(table.rowType()).isSupportedRuntimeFilter(p,
partitionKeys) =>
+ converter.convert(p)
+ case _ => None
+ }
+ if (partitionFilter.nonEmpty) {
+ readBuilder.withFilter(partitionFilter.toList.asJava)
+ // set inputPartitions null to trigger to get the new splits.
+ inputPartitions = null
+ inputSplits = null
+ }
+ }
+}
+
+abstract class PaimonScanCommon(
+ table: InnerTable,
+ requiredSchema: StructType,
+ filters: Seq[Predicate],
+ reservedFilters: Seq[Filter],
+ override val pushDownLimit: Option[Int],
+ override val pushDownTopN: Option[TopN],
+ bucketedScanDisabled: Boolean = false)
+ extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters,
pushDownLimit)
+ with SupportsReportPartitioning
+ with SupportsReportOrdering {
+
@transient
private lazy val extractBucketTransform: Option[Transform] = {
table match {
@@ -175,32 +220,4 @@ case class PaimonScan(
}
.toSeq
}
-
- // Since Spark 3.2
- override def filterAttributes(): Array[NamedReference] = {
- val requiredFields = readBuilder.readType().getFieldNames.asScala
- table
- .partitionKeys()
- .asScala
- .toArray
- .filter(requiredFields.contains)
- .map(fieldReference)
- }
-
- override def filter(predicates: Array[SparkPredicate]): Unit = {
- val converter = SparkV2FilterConverter(table.rowType())
- val partitionKeys = table.partitionKeys().asScala.toSeq
- val partitionFilter = predicates.flatMap {
- case p
- if
SparkV2FilterConverter(table.rowType()).isSupportedRuntimeFilter(p,
partitionKeys) =>
- converter.convert(p)
- case _ => None
- }
- if (partitionFilter.nonEmpty) {
- readBuilder.withFilter(partitionFilter.toList.asJava)
- // set inputPartitions null to trigger to get the new splits.
- inputPartitions = null
- inputSplits = null
- }
- }
}