This is an automated email from the ASF dual-hosted git repository.

liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 00c563e808 [CORE] Refactor push down filter to scan (#11153)
00c563e808 is described below

commit 00c563e80840f49066a031cd43cf2e51c335c63d
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Nov 28 23:06:59 2025 +0800

    [CORE] Refactor push down filter to scan (#11153)
---
 .../clickhouse/CHSparkPlanExecApi.scala            | 29 +++++++++-----
 .../gluten/execution/CHFilterExecTransformer.scala |  2 +-
 .../spark/sql/execution/utils/PushDownUtil.scala   | 36 ++++++-----------
 .../gluten/execution/MiscOperatorSuite.scala       | 10 ++---
 .../gluten/execution/DeltaScanTransformer.scala    |  9 ++++-
 .../gluten/execution/HudiScanTransformer.scala     |  9 ++++-
 .../gluten/execution/IcebergScanTransformer.scala  | 10 ++++-
 .../execution/MicroBatchScanExecTransformer.scala  |  6 ++-
 .../gluten/execution/PaimonScanTransformer.scala   | 12 ++++--
 .../gluten/backendsapi/SparkPlanExecApi.scala      | 45 +++++----------------
 .../BasicPhysicalOperatorTransformer.scala         | 21 ++++------
 .../execution/BasicScanExecTransformer.scala       | 41 ++++++++++++++++++-
 .../execution/BatchScanExecTransformer.scala       | 46 +++++++++++-----------
 .../execution/FileSourceScanExecTransformer.scala  | 28 ++++---------
 .../extension/columnar/PushDownFilterToScan.scala  | 30 +++-----------
 .../sql/hive/HiveTableScanExecTransformer.scala    |  6 ++-
 .../orc/GlutenOrcV2SchemaPruningSuite.scala        |  6 ++-
 .../TestFileSourceScanExecTransformer.scala        |  8 +++-
 .../orc/GlutenOrcV2SchemaPruningSuite.scala        |  6 ++-
 .../TestFileSourceScanExecTransformer.scala        |  8 +++-
 .../orc/GlutenOrcV2SchemaPruningSuite.scala        |  6 ++-
 .../TestFileSourceScanExecTransformer.scala        |  8 +++-
 .../orc/GlutenOrcV2SchemaPruningSuite.scala        |  6 ++-
 .../TestFileSourceScanExecTransformer.scala        |  8 +++-
 .../orc/GlutenOrcV2SchemaPruningSuite.scala        |  6 ++-
 .../TestFileSourceScanExecTransformer.scala        |  8 +++-
 .../sql/execution/FileSourceScanExecShim.scala     |  4 --
 27 files changed, 222 insertions(+), 192 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index dbaccb16c8..82da9aa694 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -849,10 +849,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
     }
   }
 
-  /** Clickhouse Backend only supports part of filters for parquet. */
-  override def postProcessPushDownFilter(
-      extraFilters: Seq[Expression],
-      sparkExecNode: LeafExecNode): Seq[Expression] = {
+  override def supportPushDownFilterToScan(sparkExecNode: LeafExecNode): 
Boolean = {
     // FIXME: DeltaMergeTreeFileFormat should not inherit from 
ParquetFileFormat.
     def isParquetFormat(fileFormat: FileFormat): Boolean = fileFormat match {
       case p: ParquetFileFormat if p.shortName().equals("parquet") => true
@@ -870,12 +867,26 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
     sparkExecNode match {
       case fileSourceScan: FileSourceScanExecTransformerBase
           if isParquetFormat(fileSourceScan.relation.fileFormat) =>
-        PushDownUtil.removeNotSupportPushDownFilters(
-          fileSourceScan.conf,
-          fileSourceScan.output,
-          fileSourceScan.dataFilters)
-      case _ => super.postProcessPushDownFilter(extraFilters, sparkExecNode)
+        false
+      case _ => true
+    }
+  }
+
+  /** Clickhouse Backend only supports part of filters for parquet. */
+  override def isSupportedScanFilter(filter: Expression, sparkExecNode: 
LeafExecNode): Boolean = {
+    // FIXME: DeltaMergeTreeFileFormat should not inherit from 
ParquetFileFormat.
+    def isParquetFormat(fileFormat: FileFormat): Boolean = fileFormat match {
+      case p: ParquetFileFormat if p.shortName().equals("parquet") => true
+      case _ => false
+    }
+
+    val isSupported = sparkExecNode match {
+      case fileSourceScan: FileSourceScanExecTransformerBase
+          if isParquetFormat(fileSourceScan.relation.fileFormat) =>
+        PushDownUtil.isSupportPushDownFilter(fileSourceScan.conf, 
fileSourceScan.output, filter)
+      case _ => true
     }
+    isSupported && super.isSupportedScanFilter(filter, sparkExecNode)
   }
 
   override def genGenerateTransformer(
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala
index 686e51fcf6..19b9238cbd 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala
@@ -36,7 +36,7 @@ case class CHFilterExecTransformer(condition: Expression, 
child: SparkPlan)
       condition
     } else {
       val remainingFilters =
-        FilterHandler.getRemainingFilters(scanFilters, 
splitConjunctivePredicates(condition))
+        FilterHandler.subtractFilters(splitConjunctivePredicates(condition), 
scanFilters)
       remainingFilters.reduceLeftOption(And).orNull
     }
   }
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/PushDownUtil.scala
 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/PushDownUtil.scala
index 4eb326fe9a..fdad42cabd 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/PushDownUtil.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/PushDownUtil.scala
@@ -26,33 +26,23 @@ import org.apache.spark.sql.sources
 
 object PushDownUtil {
 
-  def removeNotSupportPushDownFilters(
+  def isSupportPushDownFilter(
       conf: SQLConf,
       output: Seq[Attribute],
-      dataFilters: Seq[Expression]
-  ): Seq[Expression] = {
+      filter: Expression
+  ): Boolean = {
     val schema = new SparkToParquetSchemaConverter(conf).convert(
       SparkShimLoader.getSparkShims.structFromAttributes(output))
     val parquetFilters = 
SparkShimLoader.getSparkShims.createParquetFilters(conf, schema)
-
-    dataFilters
-      .flatMap {
-        sparkFilter =>
-          DataSourceStrategy.translateFilter(
-            sparkFilter,
-            supportNestedPredicatePushdown = true) match {
-            case Some(sources.StringStartsWith(_, _)) => None
-            case Some(sources.Not(sources.In(_, _) | 
sources.StringStartsWith(_, _))) => None
-            case Some(sourceFilter) => Some((sparkFilter, sourceFilter))
-            case _ => None
-          }
-      }
-      .flatMap {
-        case (sparkFilter, sourceFilter) =>
-          parquetFilters.createFilter(sourceFilter) match {
-            case Some(_) => Some(sparkFilter)
-            case None => None
-          }
-      }
+    DataSourceStrategy.translateFilter(filter, supportNestedPredicatePushdown 
= true) match {
+      case Some(sources.StringStartsWith(_, _)) => false
+      case Some(sources.Not(sources.In(_, _) | sources.StringStartsWith(_, 
_))) => false
+      case Some(sourceFilter) =>
+        parquetFilters.createFilter(sourceFilter) match {
+          case Some(_) => true
+          case None => false
+        }
+      case _ => false
+    }
   }
 }
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 3006488145..c313242091 100644
--- 
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -1976,11 +1976,11 @@ class MiscOperatorSuite extends 
VeloxWholeStageTransformerSuite with AdaptiveSpa
     val filters = plan.collect { case filter: FilterExecTransformer => filter }
     assert(scans.size == 1)
     assert(filters.size == 1)
-    assert(scans(0).dataFilters.size == 1)
-    val remainingFilters = FilterHandler.getRemainingFilters(
-      scans(0).dataFilters,
-      splitConjunctivePredicates(filters(0).condition))
-    assert(remainingFilters.size == 0)
+    assert(scans.head.filterExprs().size == 1)
+    val remainingFilters = FilterHandler.subtractFilters(
+      splitConjunctivePredicates(filters.head.condition),
+      scans.head.filterExprs())
+    assert(remainingFilters.isEmpty)
 
     // result length check, table lineitem has 60,000 rows
     val resultLength = df.collect().length
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index f55bdebf51..c3b88e8225 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -35,7 +35,8 @@ case class DeltaScanTransformer(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
     output,
@@ -74,9 +75,13 @@ case class DeltaScanTransformer(
       optionalNumCoalescedBuckets,
       QueryPlan.normalizePredicates(dataFilters, output),
       None,
-      disableBucketedScan
+      disableBucketedScan,
+      pushDownFilters.map(QueryPlan.normalizePredicates(_, output))
     )
   }
+
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
BasicScanExecTransformer =
+    copy(pushDownFilters = Some(filters))
 }
 
 object DeltaScanTransformer {
diff --git 
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
 
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
index ba430eb8d1..b320ccef33 100644
--- 
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
+++ 
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
@@ -35,7 +35,8 @@ case class HudiScanTransformer(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
     output,
@@ -66,9 +67,13 @@ case class HudiScanTransformer(
       optionalNumCoalescedBuckets,
       QueryPlan.normalizePredicates(dataFilters, output),
       None,
-      disableBucketedScan
+      disableBucketedScan,
+      pushDownFilters.map(QueryPlan.normalizePredicates(_, output))
     )
   }
+
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
BasicScanExecTransformer =
+    copy(pushDownFilters = Some(filters))
 }
 
 object HudiScanTransformer {
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index e5b098eaa8..ba1e2e399f 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -47,7 +47,8 @@ case class IcebergScanTransformer(
     override val runtimeFilters: Seq[Expression],
     @transient override val table: Table,
     override val keyGroupedPartitioning: Option[Seq[Expression]] = None,
-    override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None)
+    override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends BatchScanExecTransformerBase(
     output = output,
     scan = scan,
@@ -62,6 +63,10 @@ case class IcebergScanTransformer(
   // So use Metric to get NumSplits, NumDeletes is not reported by native 
metric
   private val numSplits = SQLMetrics.createMetric(sparkContext, new 
NumSplits().description())
 
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
BatchScanExecTransformerBase = {
+    this.copy(pushDownFilters = Some(filters))
+  }
+
   protected[this] def supportsBatchScan(scan: Scan): Boolean = {
     IcebergScanTransformer.supportsBatchScan(scan)
   }
@@ -188,7 +193,8 @@ case class IcebergScanTransformer(
       output = output.map(QueryPlan.normalizeExpressions(_, output)),
       runtimeFilters = QueryPlan.normalizePredicates(
         runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
-        output)
+        output),
+      pushDownFilters = pushDownFilters.map(QueryPlan.normalizePredicates(_, 
output))
     )
   }
   // Needed for tests
diff --git 
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
 
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
index 2f5e277d03..05688c44b6 100644
--- 
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
+++ 
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
@@ -64,7 +64,11 @@ case class MicroBatchScanExecTransformer(
   @transient override lazy val inputPartitionsShim: Seq[InputPartition] =
     stream.planInputPartitions(start, end)
 
-  override def filterExprs(): Seq[Expression] = Seq.empty
+  override def scanFilters: Seq[Expression] = Seq.empty
+
+  override def supportPushDownFilters: Boolean = false
+
+  def pushDownFilters: Option[Seq[Expression]] = None
 
   override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
 
diff --git 
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
 
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
index 5dbce795a9..05d41590c9 100644
--- 
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
+++ 
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
@@ -52,7 +52,8 @@ case class PaimonScanTransformer(
     override val runtimeFilters: Seq[Expression],
     @transient override val table: Table,
     override val keyGroupedPartitioning: Option[Seq[Expression]] = None,
-    override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None)
+    override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends BatchScanExecTransformerBase(
     output = output,
     scan = scan,
@@ -74,8 +75,6 @@ case class PaimonScanTransformer(
       throw new GlutenNotSupportException("Only support PaimonScan.")
   }
 
-  override def filterExprs(): Seq[Expression] = pushdownFilters
-
   override def getPartitionSchema: StructType = scan match {
     case paimonScan: PaimonScan =>
       val partitionKeys = paimonScan.table.partitionKeys()
@@ -86,6 +85,10 @@ case class PaimonScanTransformer(
 
   override def getDataSchema: StructType = new StructType()
 
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
PaimonScanTransformer = {
+    this.copy(pushDownFilters = Some(filters))
+  }
+
   override lazy val fileFormat: ReadFileFormat = {
     val formatStr = coreOptions.fileFormatString()
     if ("parquet".equalsIgnoreCase(formatStr)) {
@@ -180,7 +183,8 @@ case class PaimonScanTransformer(
       output = output.map(QueryPlan.normalizeExpressions(_, output)),
       runtimeFilters = QueryPlan.normalizePredicates(
         runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
-        output)
+        output),
+      pushDownFilters = pushDownFilters.map(QueryPlan.normalizePredicates(_, 
output))
     )
   }
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index a8ffca51ba..980c04e99e 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, 
Partitioning}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.FileFormat
-import org.apache.spark.sql.execution.datasources.v2.FileScan
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
 import org.apache.spark.sql.execution.joins.BuildSideRelation
 import org.apache.spark.sql.execution.metric.SQLMetric
@@ -652,41 +651,15 @@ trait SparkPlanExecApi {
 
   def rewriteSpillPath(path: String): String = path
 
-  /**
-   * Vanilla spark just push down part of filter condition into scan, however 
gluten can push down
-   * all filters. This function calculates the remaining conditions in 
FilterExec, add into the
-   * dataFilters of the leaf node.
-   * @param extraFilters:
-   *   Conjunctive Predicates, which are split from the upper FilterExec
-   * @param sparkExecNode:
-   *   The vanilla leaf node of the plan tree, which is FileSourceScanExec or 
BatchScanExec
-   * @return
-   *   return all push down filters
-   */
-  def postProcessPushDownFilter(
-      extraFilters: Seq[Expression],
-      sparkExecNode: LeafExecNode): Seq[Expression] = {
-    def getPushedFilter(dataFilters: Seq[Expression]): Seq[Expression] = {
-      val pushedFilters =
-        dataFilters ++ FilterHandler.getRemainingFilters(dataFilters, 
extraFilters)
-      pushedFilters.filterNot(_.references.exists {
-        attr => 
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
-      })
-    }
-    sparkExecNode match {
-      case fileSourceScan: FileSourceScanExecTransformerBase =>
-        getPushedFilter(fileSourceScan.dataFilters)
-      case batchScan: BatchScanExecTransformerBase =>
-        batchScan.scan match {
-          case fileScan: FileScan =>
-            getPushedFilter(fileScan.dataFilters)
-          case _ =>
-            // TODO: For data lake format use pushedFilters in 
SupportsPushDownFilters
-            extraFilters
-        }
-      case _ =>
-        throw new 
GlutenNotSupportException(s"${sparkExecNode.getClass.toString} is not 
supported.")
-    }
+  def supportPushDownFilterToScan(sparkExecNode: LeafExecNode): Boolean = true
+
+  /** Return whether the filter is supported in scan. */
+  def isSupportedScanFilter(filter: Expression, sparkExecNode: LeafExecNode): 
Boolean = {
+    ExpressionConverter.canReplaceWithExpressionTransformer(
+      ExpressionConverter.replaceAttributeReference(filter),
+      sparkExecNode.output) &&
+    (!filter.references.exists(
+      attr => 
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)))
   }
 
   def genGenerateTransformer(
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 463f540d94..24486efa54 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -114,7 +114,7 @@ abstract class FilterExecTransformerBase(val cond: 
Expression, val input: SparkP
       cond
     } else {
       val remainingFilters =
-        FilterHandler.getRemainingFilters(scanFilters, 
splitConjunctivePredicates(cond))
+        FilterHandler.subtractFilters(splitConjunctivePredicates(cond), 
scanFilters)
       remainingFilters.reduceLeftOption(And).orNull
     }
   }
@@ -312,16 +312,11 @@ object FilterHandler extends PredicateHelper {
     }
   }
 
-  /**
-   * Compare the semantics of the filter conditions pushed down to Scan and in 
the Filter.
-   *
-   * @param scanFilters
-   *   : the conditions pushed down into Scan
-   * @param filters
-   *   : the conditions in the Filter after the Scan
-   * @return
-   *   the filter conditions not pushed down into Scan.
-   */
-  def getRemainingFilters(scanFilters: Seq[Expression], filters: 
Seq[Expression]): Seq[Expression] =
-    (filters.toSet -- scanFilters.toSet).toSeq
+  def subtractFilters(left: Seq[Expression], right: Seq[Expression]): 
Seq[Expression] = {
+    (left.toSet -- right.toSet).toSeq
+  }
+
+  def combineFilters(left: Seq[Expression], right: Seq[Expression]): 
Seq[Expression] = {
+    (left.toSet ++ right.toSet).toSeq
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index ee6770bb17..5de0683e96 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -37,7 +37,46 @@ trait BasicScanExecTransformer extends LeafTransformSupport 
with BaseDataSource
   import org.apache.spark.sql.catalyst.util._
 
   /** Returns the filters that can be pushed down to native file scan */
-  def filterExprs(): Seq[Expression]
+  final def filterExprs(): Seq[Expression] = {
+    if (pushDownFilters.nonEmpty) {
+      val (_, scanFiltersNotInPushDownFilters) =
+        scanFilters.partition(pushDownFilters.get.contains(_))
+      // For filters that only exists in scan, we need to check if they are 
supported.
+      val unsupportedFilters = scanFiltersNotInPushDownFilters.filter(
+        
!BackendsApiManager.getSparkPlanExecApiInstance.isSupportedScanFilter(_, this))
+      if (unsupportedFilters.nonEmpty) {
+        throw new UnsupportedOperationException(
+          "Found unsupported filter in scan " + unsupportedFilters.mkString(", 
"))
+      }
+      val supportedPushDownFilters = pushDownFilters.get
+        
.filter(BackendsApiManager.getSparkPlanExecApiInstance.isSupportedScanFilter(_, 
this))
+      FilterHandler.combineFilters(supportedPushDownFilters, 
scanFiltersNotInPushDownFilters)
+    } else {
+      // todo: When PushDownFilterToScan is not performed, find a way to throw 
an
+      //  exception to trigger scan fallback when encountering unsupported 
scan filters,
+      //  instead of simply filtering them out.
+      scanFilters.filter(
+        
BackendsApiManager.getSparkPlanExecApiInstance.isSupportedScanFilter(_, this))
+    }
+  }
+
+  /** Returns the filters that already exists in scan. */
+  def scanFilters: Seq[Expression]
+
+  /** Whether the scan supports push down filters. */
+  def supportPushDownFilters: Boolean = true
+
+  /**
+   * Returns the filters that pushed by
+   * [[org.apache.gluten.extension.columnar.PushDownFilterToScan]].
+   */
+  def pushDownFilters: Option[Seq[Expression]]
+
+  /** Copy the scan with filters that pushed by filterNode. */
+  def withNewPushdownFilters(filters: Seq[Expression]): 
BasicScanExecTransformer = {
+    throw new UnsupportedOperationException(
+      s"${getClass.toString} does not support push down filters.")
+  }
 
   def getMetadataColumns(): Seq[AttributeReference]
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 1d0a38e887..c841f816e7 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.expression.ExpressionConverter
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -34,6 +33,8 @@ import 
org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileSca
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
 
+import com.google.common.base.Objects
+
 /** Columnar Based BatchScanExec. */
 case class BatchScanExecTransformer(
     override val output: Seq[AttributeReference],
@@ -44,7 +45,8 @@ case class BatchScanExecTransformer(
     @transient override val table: Table,
     override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
     override val applyPartialClustering: Boolean = false,
-    override val replicatePartitions: Boolean = false)
+    override val replicatePartitions: Boolean = false,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends BatchScanExecTransformerBase(
     output,
     scan,
@@ -65,9 +67,14 @@ case class BatchScanExecTransformer(
       output = output.map(QueryPlan.normalizeExpressions(_, output)),
       runtimeFilters = QueryPlan.normalizePredicates(
         runtimeFilters.filterNot(_ == 
DynamicPruningExpression(Literal.TrueLiteral)),
-        output)
+        output),
+      pushDownFilters = pushDownFilters.map(QueryPlan.normalizePredicates(_, 
output))
     )
   }
+
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
BatchScanExecTransformerBase = {
+    this.copy(pushDownFilters = Some(filters))
+  }
 }
 
 abstract class BatchScanExecTransformerBase(
@@ -102,30 +109,14 @@ abstract class BatchScanExecTransformerBase(
     postDriverMetrics()
   }
 
-  // Similar to the problem encountered in 
https://github.com/oap-project/gluten/pull/3184,
-  // we cannot add member variables to BatchScanExecTransformerBase, which 
inherits from case
-  // class. Otherwise, we will encounter an issue where makeCopy cannot find a 
constructor
-  // with the corresponding number of parameters.
-  // The workaround is to add a mutable list to pass in pushdownFilters.
-  protected var pushdownFilters: Seq[Expression] = scan match {
-    case fileScan: FileScan =>
-      fileScan.dataFilters.filter {
-        expr =>
-          ExpressionConverter.canReplaceWithExpressionTransformer(
-            ExpressionConverter.replaceAttributeReference(expr),
-            output)
-      }
+  override def scanFilters: Seq[Expression] = scan match {
+    case fileScan: FileScan => fileScan.dataFilters
     case _ =>
-      logInfo(s"${scan.getClass.toString} does not support push down filters")
+      // todo: support other DSv2 scan
+      logInfo(s"${scan.getClass.toString} does not support extracting scan 
filters.")
       Seq.empty
   }
 
-  def setPushDownFilters(filters: Seq[Expression]): Unit = {
-    pushdownFilters = filters
-  }
-
-  override def filterExprs(): Seq[Expression] = pushdownFilters
-
   override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
 
   override def getPartitions: Seq[Partition] = finalPartitions
@@ -198,6 +189,15 @@ abstract class BatchScanExecTransformerBase(
   @transient override lazy val fileFormat: ReadFileFormat =
     BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)
 
+  override def equals(other: Any): Boolean = other match {
+    case other: BatchScanExecTransformerBase =>
+      this.pushDownFilters == other.pushDownFilters && super.equals(other)
+    case _ =>
+      false
+  }
+
+  override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters, 
pushDownFilters)
+
   override def simpleString(maxFields: Int): String = {
     val truncatedOutputString = truncatedString(output, "[", ", ", "]", 
maxFields)
     val runtimeFiltersString = s"RuntimeFilters: 
${runtimeFilters.mkString("[", ",", "]")}"
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index c00ed1f671..1e7de78d94 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -17,7 +17,6 @@
 package org.apache.gluten.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.expression.ExpressionConverter
 import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -33,7 +32,6 @@ import org.apache.spark.sql.execution.FileSourceScanExecShim
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.execution.metric.SQLMetric
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SparkVersionUtil
 import org.apache.spark.util.collection.BitSet
 
 import org.apache.commons.lang3.StringUtils
@@ -47,7 +45,8 @@ case class FileSourceScanExecTransformer(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
     output,
@@ -71,9 +70,13 @@ case class FileSourceScanExecTransformer(
       optionalNumCoalescedBuckets,
       QueryPlan.normalizePredicates(dataFilters, output),
       None,
-      disableBucketedScan
+      disableBucketedScan,
+      pushDownFilters.map(QueryPlan.normalizePredicates(_, output))
     )
   }
+
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
FileSourceScanExecTransformer =
+    copy(pushDownFilters = Some(filters))
 }
 
 abstract class FileSourceScanExecTransformerBase(
@@ -104,22 +107,7 @@ abstract class FileSourceScanExecTransformerBase(
       .genFileSourceScanTransformerMetrics(sparkContext)
       .filter(m => !driverMetricsAlias.contains(m._1)) ++ driverMetricsAlias
 
-  override def filterExprs(): Seq[Expression] = dataFiltersInScan.filter {
-    expr =>
-      ExpressionConverter.canReplaceWithExpressionTransformer(
-        ExpressionConverter.replaceAttributeReference(expr),
-        output)
-  }
-
-  override def dataFiltersInScan: Seq[Expression] = {
-    if (SparkVersionUtil.gteSpark35) {
-      dataFilters.filterNot(_.references.exists {
-        attr => 
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
-      })
-    } else {
-      super.dataFiltersInScan
-    }
-  }
+  override def scanFilters: Seq[Expression] = dataFilters
 
   override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala
index e0da91dd7c..9a6e271b35 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala
@@ -17,7 +17,7 @@
 package org.apache.gluten.extension.columnar
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.{BatchScanExecTransformerBase, 
FileSourceScanExecTransformer, FilterExecTransformerBase}
+import org.apache.gluten.execution.{BasicScanExecTransformer, 
FilterExecTransformerBase}
 
 import org.apache.spark.sql.catalyst.expressions.PredicateHelper
 import org.apache.spark.sql.catalyst.rules.Rule
@@ -31,35 +31,15 @@ object PushDownFilterToScan extends Rule[SparkPlan] with 
PredicateHelper {
   override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
     case filter: FilterExecTransformerBase =>
       filter.child match {
-        case fileScan: FileSourceScanExecTransformer =>
-          val pushDownFilters =
-            
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
-              splitConjunctivePredicates(filter.cond),
-              fileScan)
-          val newScan = fileScan.copy(dataFilters = pushDownFilters)
+        case scan: BasicScanExecTransformer
+            if 
BackendsApiManager.getSparkPlanExecApiInstance.supportPushDownFilterToScan(
+              scan) && scan.supportPushDownFilters =>
+          val newScan = 
scan.withNewPushdownFilters(splitConjunctivePredicates(filter.cond))
           if (newScan.doValidate().ok()) {
             filter.withNewChildren(Seq(newScan))
           } else {
             filter
           }
-        case batchScan: BatchScanExecTransformerBase =>
-          val pushDownFilters =
-            
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
-              splitConjunctivePredicates(filter.cond),
-              batchScan)
-          // If BatchScanExecTransformerBase's parent is filter, 
pushdownFilters can't be None.
-          batchScan.setPushDownFilters(Seq.empty)
-          val newScan = batchScan
-          if (pushDownFilters.nonEmpty) {
-            newScan.setPushDownFilters(pushDownFilters)
-            if (newScan.doValidate().ok()) {
-              filter.withNewChildren(Seq(newScan))
-            } else {
-              filter
-            }
-          } else {
-            filter
-          }
         case _ => filter
       }
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 6d0b16fabb..8bf3e74ac0 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -63,7 +63,11 @@ case class HiveTableScanExecTransformer(
     hiveQlTable.getOutputFormatClass,
     hiveQlTable.getMetadata)
 
-  override def filterExprs(): Seq[Expression] = Seq.empty
+  override def scanFilters: Seq[Expression] = Seq.empty
+
+  override def supportPushDownFilters: Boolean = false
+
+  override def pushDownFilters: Option[Seq[Expression]] = None
 
   override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
 
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index 101d6fb342..4356467a6d 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -29,8 +29,10 @@ class GlutenOrcV2SchemaPruningSuite extends 
OrcV2SchemaPruningSuite with GlutenS
   override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: 
String*): Unit = {
     val fileSourceScanSchemata =
       collect(df.queryExecution.executedPlan) {
-        case BatchScanExec(_, scan: OrcScan, _) => scan.readDataSchema
-        case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _) 
=> scan.readDataSchema
+        case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
+        case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
       }
     assert(
       fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 884e62d5b9..2857a4a680 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer, 
FileSourceScanExecTransformerBase}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
@@ -38,7 +38,8 @@ case class TestFileSourceScanExecTransformer(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
     output,
@@ -82,4 +83,7 @@ case class TestFileSourceScanExecTransformer(
       disableBucketedScan
     )
   }
+
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
BasicScanExecTransformer =
+    copy(pushDownFilters = Some(filters))
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index 04a4c05b9d..76a9a6ef95 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -34,8 +34,10 @@ class GlutenOrcV2SchemaPruningSuite extends 
OrcV2SchemaPruningSuite with GlutenS
   override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: 
String*): Unit = {
     val fileSourceScanSchemata =
       collect(df.queryExecution.executedPlan) {
-        case BatchScanExec(_, scan: OrcScan, _, _) => scan.readDataSchema
-        case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _) 
=> scan.readDataSchema
+        case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
+        case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
       }
     assert(
       fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 9f5d47b60b..6aced7cfa6 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer, 
FileSourceScanExecTransformerBase}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
@@ -37,7 +37,8 @@ case class TestFileSourceScanExecTransformer(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
     output,
@@ -65,4 +66,7 @@ case class TestFileSourceScanExecTransformer(
     getPartitions.map((_, fileFormat))
 
   override val nodeNamePrefix: String = "TestFile"
+
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
BasicScanExecTransformer =
+    copy(pushDownFilters = Some(filters))
 }
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index 000103b903..76a9a6ef95 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -34,8 +34,10 @@ class GlutenOrcV2SchemaPruningSuite extends 
OrcV2SchemaPruningSuite with GlutenS
   override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: 
String*): Unit = {
     val fileSourceScanSchemata =
       collect(df.queryExecution.executedPlan) {
-        case BatchScanExec(_, scan: OrcScan, _, _, _, _, _, _, _) => 
scan.readDataSchema
-        case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _) 
=> scan.readDataSchema
+        case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
+        case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
       }
     assert(
       fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 9f5d47b60b..6aced7cfa6 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer, 
FileSourceScanExecTransformerBase}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
@@ -37,7 +37,8 @@ case class TestFileSourceScanExecTransformer(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
     output,
@@ -65,4 +66,7 @@ case class TestFileSourceScanExecTransformer(
     getPartitions.map((_, fileFormat))
 
   override val nodeNamePrefix: String = "TestFile"
+
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
BasicScanExecTransformer =
+    copy(pushDownFilters = Some(filters))
 }
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index e47deb0d70..76a9a6ef95 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -34,8 +34,10 @@ class GlutenOrcV2SchemaPruningSuite extends 
OrcV2SchemaPruningSuite with GlutenS
   override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: 
String*): Unit = {
     val fileSourceScanSchemata =
       collect(df.queryExecution.executedPlan) {
-        case BatchScanExec(_, scan: OrcScan, _, _, _, _) => scan.readDataSchema
-        case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _) 
=> scan.readDataSchema
+        case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
+        case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
       }
     assert(
       fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 9f5d47b60b..6aced7cfa6 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer, 
FileSourceScanExecTransformerBase}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
@@ -37,7 +37,8 @@ case class TestFileSourceScanExecTransformer(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
     output,
@@ -65,4 +66,7 @@ case class TestFileSourceScanExecTransformer(
     getPartitions.map((_, fileFormat))
 
   override val nodeNamePrefix: String = "TestFile"
+
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
BasicScanExecTransformer =
+    copy(pushDownFilters = Some(filters))
 }
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index e47deb0d70..76a9a6ef95 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -34,8 +34,10 @@ class GlutenOrcV2SchemaPruningSuite extends 
OrcV2SchemaPruningSuite with GlutenS
   override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: 
String*): Unit = {
     val fileSourceScanSchemata =
       collect(df.queryExecution.executedPlan) {
-        case BatchScanExec(_, scan: OrcScan, _, _, _, _) => scan.readDataSchema
-        case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _) 
=> scan.readDataSchema
+        case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
+        case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+          b.scan.asInstanceOf[OrcScan].readDataSchema
       }
     assert(
       fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index e805d175be..644e3c5839 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.sql.extension
 
 import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer, 
FileSourceScanExecTransformerBase}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
@@ -37,7 +37,8 @@ case class TestFileSourceScanExecTransformer(
     override val optionalNumCoalescedBuckets: Option[Int],
     override val dataFilters: Seq[Expression],
     override val tableIdentifier: Option[TableIdentifier],
-    override val disableBucketedScan: Boolean = false)
+    override val disableBucketedScan: Boolean = false,
+    override val pushDownFilters: Option[Seq[Expression]] = None)
   extends FileSourceScanExecTransformerBase(
     relation,
     output,
@@ -64,4 +65,7 @@ case class TestFileSourceScanExecTransformer(
     getPartitions.map((_, fileFormat))
 
   override val nodeNamePrefix: String = "TestFile"
+
+  override def withNewPushdownFilters(filters: Seq[Expression]): 
BasicScanExecTransformer =
+    copy(pushDownFilters = Some(filters))
 }
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 234762f2f7..75049793d8 100644
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -58,10 +58,6 @@ abstract class FileSourceScanExecShim(
 
   protected lazy val driverMetricsAlias = driverMetrics
 
-  def dataFiltersInScan: Seq[Expression] = {
-    throw new UnsupportedOperationException("Not implemented")
-  }
-
   def hasUnsupportedColumns: Boolean = {
     // TODO, fallback if user define same name column due to we can't right now
     // detect which column is metadata column which is user defined column.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to