This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e9da0ce9c1 [spark] Refactor PaimonScan hierarchy to support flexible 
runtime filtering implementations (#6616)
e9da0ce9c1 is described below

commit e9da0ce9c1c0652afc27dbb1c917e149627d172a
Author: Kerwin Zhang <[email protected]>
AuthorDate: Mon Nov 17 17:04:51 2025 +0800

    [spark] Refactor PaimonScan hierarchy to support flexible runtime filtering 
implementations (#6616)
---
 .../scala/org/apache/paimon/spark/PaimonScan.scala | 83 +++++++++++++---------
 1 file changed, 50 insertions(+), 33 deletions(-)

diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
index 8d8d1825ba..fe3efcea11 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScan.scala
@@ -42,15 +42,60 @@ case class PaimonScan(
     override val pushDownLimit: Option[Int],
     override val pushDownTopN: Option[TopN],
     bucketedScanDisabled: Boolean = false)
-  extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
-  with SupportsRuntimeV2Filtering
-  with SupportsReportPartitioning
-  with SupportsReportOrdering {
-
+  extends PaimonScanCommon(
+    table,
+    requiredSchema,
+    filters,
+    reservedFilters,
+    pushDownLimit,
+    pushDownTopN,
+    bucketedScanDisabled)
+  with SupportsRuntimeV2Filtering {
   def disableBucketedScan(): PaimonScan = {
     copy(bucketedScanDisabled = true)
   }
 
+  // Since Spark 3.2
+  override def filterAttributes(): Array[NamedReference] = {
+    val requiredFields = readBuilder.readType().getFieldNames.asScala
+    table
+      .partitionKeys()
+      .asScala
+      .toArray
+      .filter(requiredFields.contains)
+      .map(fieldReference)
+  }
+
+  override def filter(predicates: Array[SparkPredicate]): Unit = {
+    val converter = SparkV2FilterConverter(table.rowType())
+    val partitionKeys = table.partitionKeys().asScala.toSeq
+    val partitionFilter = predicates.flatMap {
+      case p
+          if 
SparkV2FilterConverter(table.rowType()).isSupportedRuntimeFilter(p, 
partitionKeys) =>
+        converter.convert(p)
+      case _ => None
+    }
+    if (partitionFilter.nonEmpty) {
+      readBuilder.withFilter(partitionFilter.toList.asJava)
+      // set inputPartitions null to trigger to get the new splits.
+      inputPartitions = null
+      inputSplits = null
+    }
+  }
+}
+
+abstract class PaimonScanCommon(
+    table: InnerTable,
+    requiredSchema: StructType,
+    filters: Seq[Predicate],
+    reservedFilters: Seq[Filter],
+    override val pushDownLimit: Option[Int],
+    override val pushDownTopN: Option[TopN],
+    bucketedScanDisabled: Boolean = false)
+  extends PaimonBaseScan(table, requiredSchema, filters, reservedFilters, 
pushDownLimit)
+  with SupportsReportPartitioning
+  with SupportsReportOrdering {
+
   @transient
   private lazy val extractBucketTransform: Option[Transform] = {
     table match {
@@ -175,32 +220,4 @@ case class PaimonScan(
       }
       .toSeq
   }
-
-  // Since Spark 3.2
-  override def filterAttributes(): Array[NamedReference] = {
-    val requiredFields = readBuilder.readType().getFieldNames.asScala
-    table
-      .partitionKeys()
-      .asScala
-      .toArray
-      .filter(requiredFields.contains)
-      .map(fieldReference)
-  }
-
-  override def filter(predicates: Array[SparkPredicate]): Unit = {
-    val converter = SparkV2FilterConverter(table.rowType())
-    val partitionKeys = table.partitionKeys().asScala.toSeq
-    val partitionFilter = predicates.flatMap {
-      case p
-          if 
SparkV2FilterConverter(table.rowType()).isSupportedRuntimeFilter(p, 
partitionKeys) =>
-        converter.convert(p)
-      case _ => None
-    }
-    if (partitionFilter.nonEmpty) {
-      readBuilder.withFilter(partitionFilter.toList.asJava)
-      // set inputPartitions null to trigger to get the new splits.
-      inputPartitions = null
-      inputSplits = null
-    }
-  }
 }

Reply via email to