This is an automated email from the ASF dual-hosted git repository.
liujiayi771 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 00c563e808 [CORE] Refactor push down filter to scan (#11153)
00c563e808 is described below
commit 00c563e80840f49066a031cd43cf2e51c335c63d
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Nov 28 23:06:59 2025 +0800
[CORE] Refactor push down filter to scan (#11153)
---
.../clickhouse/CHSparkPlanExecApi.scala | 29 +++++++++-----
.../gluten/execution/CHFilterExecTransformer.scala | 2 +-
.../spark/sql/execution/utils/PushDownUtil.scala | 36 ++++++-----------
.../gluten/execution/MiscOperatorSuite.scala | 10 ++---
.../gluten/execution/DeltaScanTransformer.scala | 9 ++++-
.../gluten/execution/HudiScanTransformer.scala | 9 ++++-
.../gluten/execution/IcebergScanTransformer.scala | 10 ++++-
.../execution/MicroBatchScanExecTransformer.scala | 6 ++-
.../gluten/execution/PaimonScanTransformer.scala | 12 ++++--
.../gluten/backendsapi/SparkPlanExecApi.scala | 45 +++++----------------
.../BasicPhysicalOperatorTransformer.scala | 21 ++++------
.../execution/BasicScanExecTransformer.scala | 41 ++++++++++++++++++-
.../execution/BatchScanExecTransformer.scala | 46 +++++++++++-----------
.../execution/FileSourceScanExecTransformer.scala | 28 ++++---------
.../extension/columnar/PushDownFilterToScan.scala | 30 +++-----------
.../sql/hive/HiveTableScanExecTransformer.scala | 6 ++-
.../orc/GlutenOrcV2SchemaPruningSuite.scala | 6 ++-
.../TestFileSourceScanExecTransformer.scala | 8 +++-
.../orc/GlutenOrcV2SchemaPruningSuite.scala | 6 ++-
.../TestFileSourceScanExecTransformer.scala | 8 +++-
.../orc/GlutenOrcV2SchemaPruningSuite.scala | 6 ++-
.../TestFileSourceScanExecTransformer.scala | 8 +++-
.../orc/GlutenOrcV2SchemaPruningSuite.scala | 6 ++-
.../TestFileSourceScanExecTransformer.scala | 8 +++-
.../orc/GlutenOrcV2SchemaPruningSuite.scala | 6 ++-
.../TestFileSourceScanExecTransformer.scala | 8 +++-
.../sql/execution/FileSourceScanExecShim.scala | 4 --
27 files changed, 222 insertions(+), 192 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index dbaccb16c8..82da9aa694 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -849,10 +849,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
}
}
- /** Clickhouse Backend only supports part of filters for parquet. */
- override def postProcessPushDownFilter(
- extraFilters: Seq[Expression],
- sparkExecNode: LeafExecNode): Seq[Expression] = {
+ override def supportPushDownFilterToScan(sparkExecNode: LeafExecNode):
Boolean = {
// FIXME: DeltaMergeTreeFileFormat should not inherit from
ParquetFileFormat.
def isParquetFormat(fileFormat: FileFormat): Boolean = fileFormat match {
case p: ParquetFileFormat if p.shortName().equals("parquet") => true
@@ -870,12 +867,26 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
sparkExecNode match {
case fileSourceScan: FileSourceScanExecTransformerBase
if isParquetFormat(fileSourceScan.relation.fileFormat) =>
- PushDownUtil.removeNotSupportPushDownFilters(
- fileSourceScan.conf,
- fileSourceScan.output,
- fileSourceScan.dataFilters)
- case _ => super.postProcessPushDownFilter(extraFilters, sparkExecNode)
+ false
+ case _ => true
+ }
+ }
+
+ /** Clickhouse Backend only supports part of filters for parquet. */
+ override def isSupportedScanFilter(filter: Expression, sparkExecNode:
LeafExecNode): Boolean = {
+ // FIXME: DeltaMergeTreeFileFormat should not inherit from
ParquetFileFormat.
+ def isParquetFormat(fileFormat: FileFormat): Boolean = fileFormat match {
+ case p: ParquetFileFormat if p.shortName().equals("parquet") => true
+ case _ => false
+ }
+
+ val isSupported = sparkExecNode match {
+ case fileSourceScan: FileSourceScanExecTransformerBase
+ if isParquetFormat(fileSourceScan.relation.fileFormat) =>
+ PushDownUtil.isSupportPushDownFilter(fileSourceScan.conf,
fileSourceScan.output, filter)
+ case _ => true
}
+ isSupported && super.isSupportedScanFilter(filter, sparkExecNode)
}
override def genGenerateTransformer(
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala
index 686e51fcf6..19b9238cbd 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHFilterExecTransformer.scala
@@ -36,7 +36,7 @@ case class CHFilterExecTransformer(condition: Expression,
child: SparkPlan)
condition
} else {
val remainingFilters =
- FilterHandler.getRemainingFilters(scanFilters,
splitConjunctivePredicates(condition))
+ FilterHandler.subtractFilters(splitConjunctivePredicates(condition),
scanFilters)
remainingFilters.reduceLeftOption(And).orNull
}
}
diff --git
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/PushDownUtil.scala
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/PushDownUtil.scala
index 4eb326fe9a..fdad42cabd 100644
---
a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/PushDownUtil.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/PushDownUtil.scala
@@ -26,33 +26,23 @@ import org.apache.spark.sql.sources
object PushDownUtil {
- def removeNotSupportPushDownFilters(
+ def isSupportPushDownFilter(
conf: SQLConf,
output: Seq[Attribute],
- dataFilters: Seq[Expression]
- ): Seq[Expression] = {
+ filter: Expression
+ ): Boolean = {
val schema = new SparkToParquetSchemaConverter(conf).convert(
SparkShimLoader.getSparkShims.structFromAttributes(output))
val parquetFilters =
SparkShimLoader.getSparkShims.createParquetFilters(conf, schema)
-
- dataFilters
- .flatMap {
- sparkFilter =>
- DataSourceStrategy.translateFilter(
- sparkFilter,
- supportNestedPredicatePushdown = true) match {
- case Some(sources.StringStartsWith(_, _)) => None
- case Some(sources.Not(sources.In(_, _) |
sources.StringStartsWith(_, _))) => None
- case Some(sourceFilter) => Some((sparkFilter, sourceFilter))
- case _ => None
- }
- }
- .flatMap {
- case (sparkFilter, sourceFilter) =>
- parquetFilters.createFilter(sourceFilter) match {
- case Some(_) => Some(sparkFilter)
- case None => None
- }
- }
+ DataSourceStrategy.translateFilter(filter, supportNestedPredicatePushdown
= true) match {
+ case Some(sources.StringStartsWith(_, _)) => false
+ case Some(sources.Not(sources.In(_, _) | sources.StringStartsWith(_,
_))) => false
+ case Some(sourceFilter) =>
+ parquetFilters.createFilter(sourceFilter) match {
+ case Some(_) => true
+ case None => false
+ }
+ case _ => false
+ }
}
}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
index 3006488145..c313242091 100644
---
a/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/MiscOperatorSuite.scala
@@ -1976,11 +1976,11 @@ class MiscOperatorSuite extends
VeloxWholeStageTransformerSuite with AdaptiveSpa
val filters = plan.collect { case filter: FilterExecTransformer => filter }
assert(scans.size == 1)
assert(filters.size == 1)
- assert(scans(0).dataFilters.size == 1)
- val remainingFilters = FilterHandler.getRemainingFilters(
- scans(0).dataFilters,
- splitConjunctivePredicates(filters(0).condition))
- assert(remainingFilters.size == 0)
+ assert(scans.head.filterExprs().size == 1)
+ val remainingFilters = FilterHandler.subtractFilters(
+ splitConjunctivePredicates(filters.head.condition),
+ scans.head.filterExprs())
+ assert(remainingFilters.isEmpty)
// result length check, table lineitem has 60,000 rows
val resultLength = df.collect().length
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index f55bdebf51..c3b88e8225 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -35,7 +35,8 @@ case class DeltaScanTransformer(
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
- override val disableBucketedScan: Boolean = false)
+ override val disableBucketedScan: Boolean = false,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
output,
@@ -74,9 +75,13 @@ case class DeltaScanTransformer(
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None,
- disableBucketedScan
+ disableBucketedScan,
+ pushDownFilters.map(QueryPlan.normalizePredicates(_, output))
)
}
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer =
+ copy(pushDownFilters = Some(filters))
}
object DeltaScanTransformer {
diff --git
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
index ba430eb8d1..b320ccef33 100644
---
a/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
+++
b/gluten-hudi/src/main/scala/org/apache/gluten/execution/HudiScanTransformer.scala
@@ -35,7 +35,8 @@ case class HudiScanTransformer(
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
- override val disableBucketedScan: Boolean = false)
+ override val disableBucketedScan: Boolean = false,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
output,
@@ -66,9 +67,13 @@ case class HudiScanTransformer(
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None,
- disableBucketedScan
+ disableBucketedScan,
+ pushDownFilters.map(QueryPlan.normalizePredicates(_, output))
)
}
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer =
+ copy(pushDownFilters = Some(filters))
}
object HudiScanTransformer {
diff --git
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
index e5b098eaa8..ba1e2e399f 100644
---
a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/IcebergScanTransformer.scala
@@ -47,7 +47,8 @@ case class IcebergScanTransformer(
override val runtimeFilters: Seq[Expression],
@transient override val table: Table,
override val keyGroupedPartitioning: Option[Seq[Expression]] = None,
- override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None)
+ override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends BatchScanExecTransformerBase(
output = output,
scan = scan,
@@ -62,6 +63,10 @@ case class IcebergScanTransformer(
// So use Metric to get NumSplits, NumDeletes is not reported by native
metric
private val numSplits = SQLMetrics.createMetric(sparkContext, new
NumSplits().description())
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BatchScanExecTransformerBase = {
+ this.copy(pushDownFilters = Some(filters))
+ }
+
protected[this] def supportsBatchScan(scan: Scan): Boolean = {
IcebergScanTransformer.supportsBatchScan(scan)
}
@@ -188,7 +193,8 @@ case class IcebergScanTransformer(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
runtimeFilters = QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output)
+ output),
+ pushDownFilters = pushDownFilters.map(QueryPlan.normalizePredicates(_,
output))
)
}
// Needed for tests
diff --git
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
index 2f5e277d03..05688c44b6 100644
---
a/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
+++
b/gluten-kafka/src/main/scala/org/apache/gluten/execution/MicroBatchScanExecTransformer.scala
@@ -64,7 +64,11 @@ case class MicroBatchScanExecTransformer(
@transient override lazy val inputPartitionsShim: Seq[InputPartition] =
stream.planInputPartitions(start, end)
- override def filterExprs(): Seq[Expression] = Seq.empty
+ override def scanFilters: Seq[Expression] = Seq.empty
+
+ override def supportPushDownFilters: Boolean = false
+
+ def pushDownFilters: Option[Seq[Expression]] = None
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
diff --git
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
index 5dbce795a9..05d41590c9 100644
---
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
+++
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
@@ -52,7 +52,8 @@ case class PaimonScanTransformer(
override val runtimeFilters: Seq[Expression],
@transient override val table: Table,
override val keyGroupedPartitioning: Option[Seq[Expression]] = None,
- override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None)
+ override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends BatchScanExecTransformerBase(
output = output,
scan = scan,
@@ -74,8 +75,6 @@ case class PaimonScanTransformer(
throw new GlutenNotSupportException("Only support PaimonScan.")
}
- override def filterExprs(): Seq[Expression] = pushdownFilters
-
override def getPartitionSchema: StructType = scan match {
case paimonScan: PaimonScan =>
val partitionKeys = paimonScan.table.partitionKeys()
@@ -86,6 +85,10 @@ case class PaimonScanTransformer(
override def getDataSchema: StructType = new StructType()
+ override def withNewPushdownFilters(filters: Seq[Expression]):
PaimonScanTransformer = {
+ this.copy(pushDownFilters = Some(filters))
+ }
+
override lazy val fileFormat: ReadFileFormat = {
val formatStr = coreOptions.fileFormatString()
if ("parquet".equalsIgnoreCase(formatStr)) {
@@ -180,7 +183,8 @@ case class PaimonScanTransformer(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
runtimeFilters = QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output)
+ output),
+ pushDownFilters = pushDownFilters.map(QueryPlan.normalizePredicates(_,
output))
)
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index a8ffca51ba..980c04e99e 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -38,7 +38,6 @@ import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode,
Partitioning}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.FileFormat
-import org.apache.spark.sql.execution.datasources.v2.FileScan
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.BuildSideRelation
import org.apache.spark.sql.execution.metric.SQLMetric
@@ -652,41 +651,15 @@ trait SparkPlanExecApi {
def rewriteSpillPath(path: String): String = path
- /**
- * Vanilla spark just push down part of filter condition into scan, however
gluten can push down
- * all filters. This function calculates the remaining conditions in
FilterExec, add into the
- * dataFilters of the leaf node.
- * @param extraFilters:
- * Conjunctive Predicates, which are split from the upper FilterExec
- * @param sparkExecNode:
- * The vanilla leaf node of the plan tree, which is FileSourceScanExec or
BatchScanExec
- * @return
- * return all push down filters
- */
- def postProcessPushDownFilter(
- extraFilters: Seq[Expression],
- sparkExecNode: LeafExecNode): Seq[Expression] = {
- def getPushedFilter(dataFilters: Seq[Expression]): Seq[Expression] = {
- val pushedFilters =
- dataFilters ++ FilterHandler.getRemainingFilters(dataFilters,
extraFilters)
- pushedFilters.filterNot(_.references.exists {
- attr =>
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
- })
- }
- sparkExecNode match {
- case fileSourceScan: FileSourceScanExecTransformerBase =>
- getPushedFilter(fileSourceScan.dataFilters)
- case batchScan: BatchScanExecTransformerBase =>
- batchScan.scan match {
- case fileScan: FileScan =>
- getPushedFilter(fileScan.dataFilters)
- case _ =>
- // TODO: For data lake format use pushedFilters in
SupportsPushDownFilters
- extraFilters
- }
- case _ =>
- throw new
GlutenNotSupportException(s"${sparkExecNode.getClass.toString} is not
supported.")
- }
+ def supportPushDownFilterToScan(sparkExecNode: LeafExecNode): Boolean = true
+
+ /** Return whether the filter is supported in scan. */
+ def isSupportedScanFilter(filter: Expression, sparkExecNode: LeafExecNode):
Boolean = {
+ ExpressionConverter.canReplaceWithExpressionTransformer(
+ ExpressionConverter.replaceAttributeReference(filter),
+ sparkExecNode.output) &&
+ (!filter.references.exists(
+ attr =>
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)))
}
def genGenerateTransformer(
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 463f540d94..24486efa54 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -114,7 +114,7 @@ abstract class FilterExecTransformerBase(val cond:
Expression, val input: SparkP
cond
} else {
val remainingFilters =
- FilterHandler.getRemainingFilters(scanFilters,
splitConjunctivePredicates(cond))
+ FilterHandler.subtractFilters(splitConjunctivePredicates(cond),
scanFilters)
remainingFilters.reduceLeftOption(And).orNull
}
}
@@ -312,16 +312,11 @@ object FilterHandler extends PredicateHelper {
}
}
- /**
- * Compare the semantics of the filter conditions pushed down to Scan and in
the Filter.
- *
- * @param scanFilters
- * : the conditions pushed down into Scan
- * @param filters
- * : the conditions in the Filter after the Scan
- * @return
- * the filter conditions not pushed down into Scan.
- */
- def getRemainingFilters(scanFilters: Seq[Expression], filters:
Seq[Expression]): Seq[Expression] =
- (filters.toSet -- scanFilters.toSet).toSeq
+ def subtractFilters(left: Seq[Expression], right: Seq[Expression]):
Seq[Expression] = {
+ (left.toSet -- right.toSet).toSeq
+ }
+
+ def combineFilters(left: Seq[Expression], right: Seq[Expression]):
Seq[Expression] = {
+ (left.toSet ++ right.toSet).toSeq
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
index ee6770bb17..5de0683e96 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala
@@ -37,7 +37,46 @@ trait BasicScanExecTransformer extends LeafTransformSupport
with BaseDataSource
import org.apache.spark.sql.catalyst.util._
/** Returns the filters that can be pushed down to native file scan */
- def filterExprs(): Seq[Expression]
+ final def filterExprs(): Seq[Expression] = {
+ if (pushDownFilters.nonEmpty) {
+ val (_, scanFiltersNotInPushDownFilters) =
+ scanFilters.partition(pushDownFilters.get.contains(_))
+ // For filters that only exists in scan, we need to check if they are
supported.
+ val unsupportedFilters = scanFiltersNotInPushDownFilters.filter(
+
!BackendsApiManager.getSparkPlanExecApiInstance.isSupportedScanFilter(_, this))
+ if (unsupportedFilters.nonEmpty) {
+ throw new UnsupportedOperationException(
+ "Found unsupported filter in scan " + unsupportedFilters.mkString(",
"))
+ }
+ val supportedPushDownFilters = pushDownFilters.get
+
.filter(BackendsApiManager.getSparkPlanExecApiInstance.isSupportedScanFilter(_,
this))
+ FilterHandler.combineFilters(supportedPushDownFilters,
scanFiltersNotInPushDownFilters)
+ } else {
+ // todo: When PushDownFilterToScan is not performed, find a way to throw
an
+ // exception to trigger scan fallback when encountering unsupported
scan filters,
+ // instead of simply filtering them out.
+ scanFilters.filter(
+
BackendsApiManager.getSparkPlanExecApiInstance.isSupportedScanFilter(_, this))
+ }
+ }
+
+ /** Returns the filters that already exists in scan. */
+ def scanFilters: Seq[Expression]
+
+ /** Whether the scan supports push down filters. */
+ def supportPushDownFilters: Boolean = true
+
+ /**
+ * Returns the filters that pushed by
+ * [[org.apache.gluten.extension.columnar.PushDownFilterToScan]].
+ */
+ def pushDownFilters: Option[Seq[Expression]]
+
+ /** Copy the scan with filters that pushed by filterNode. */
+ def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer = {
+ throw new UnsupportedOperationException(
+ s"${getClass.toString} does not support push down filters.")
+ }
def getMetadataColumns(): Seq[AttributeReference]
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
index 1d0a38e887..c841f816e7 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BatchScanExecTransformer.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -34,6 +33,8 @@ import
org.apache.spark.sql.execution.datasources.v2.{BatchScanExecShim, FileSca
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
+import com.google.common.base.Objects
+
/** Columnar Based BatchScanExec. */
case class BatchScanExecTransformer(
override val output: Seq[AttributeReference],
@@ -44,7 +45,8 @@ case class BatchScanExecTransformer(
@transient override val table: Table,
override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None,
override val applyPartialClustering: Boolean = false,
- override val replicatePartitions: Boolean = false)
+ override val replicatePartitions: Boolean = false,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends BatchScanExecTransformerBase(
output,
scan,
@@ -65,9 +67,14 @@ case class BatchScanExecTransformer(
output = output.map(QueryPlan.normalizeExpressions(_, output)),
runtimeFilters = QueryPlan.normalizePredicates(
runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
- output)
+ output),
+ pushDownFilters = pushDownFilters.map(QueryPlan.normalizePredicates(_,
output))
)
}
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BatchScanExecTransformerBase = {
+ this.copy(pushDownFilters = Some(filters))
+ }
}
abstract class BatchScanExecTransformerBase(
@@ -102,30 +109,14 @@ abstract class BatchScanExecTransformerBase(
postDriverMetrics()
}
- // Similar to the problem encountered in
https://github.com/oap-project/gluten/pull/3184,
- // we cannot add member variables to BatchScanExecTransformerBase, which
inherits from case
- // class. Otherwise, we will encounter an issue where makeCopy cannot find a
constructor
- // with the corresponding number of parameters.
- // The workaround is to add a mutable list to pass in pushdownFilters.
- protected var pushdownFilters: Seq[Expression] = scan match {
- case fileScan: FileScan =>
- fileScan.dataFilters.filter {
- expr =>
- ExpressionConverter.canReplaceWithExpressionTransformer(
- ExpressionConverter.replaceAttributeReference(expr),
- output)
- }
+ override def scanFilters: Seq[Expression] = scan match {
+ case fileScan: FileScan => fileScan.dataFilters
case _ =>
- logInfo(s"${scan.getClass.toString} does not support push down filters")
+ // todo: support other DSv2 scan
+ logInfo(s"${scan.getClass.toString} does not support extracting scan
filters.")
Seq.empty
}
- def setPushDownFilters(filters: Seq[Expression]): Unit = {
- pushdownFilters = filters
- }
-
- override def filterExprs(): Seq[Expression] = pushdownFilters
-
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
override def getPartitions: Seq[Partition] = finalPartitions
@@ -198,6 +189,15 @@ abstract class BatchScanExecTransformerBase(
@transient override lazy val fileFormat: ReadFileFormat =
BackendsApiManager.getSettings.getSubstraitReadFileFormatV2(scan)
+ override def equals(other: Any): Boolean = other match {
+ case other: BatchScanExecTransformerBase =>
+ this.pushDownFilters == other.pushDownFilters && super.equals(other)
+ case _ =>
+ false
+ }
+
+ override def hashCode(): Int = Objects.hashCode(batch, runtimeFilters,
pushDownFilters)
+
override def simpleString(maxFields: Int): String = {
val truncatedOutputString = truncatedString(output, "[", ", ", "]",
maxFields)
val runtimeFiltersString = s"RuntimeFilters:
${runtimeFilters.mkString("[", ",", "]")}"
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
index c00ed1f671..1e7de78d94 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/FileSourceScanExecTransformer.scala
@@ -17,7 +17,6 @@
package org.apache.gluten.execution
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.expression.ExpressionConverter
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -33,7 +32,6 @@ import org.apache.spark.sql.execution.FileSourceScanExecShim
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.SparkVersionUtil
import org.apache.spark.util.collection.BitSet
import org.apache.commons.lang3.StringUtils
@@ -47,7 +45,8 @@ case class FileSourceScanExecTransformer(
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
- override val disableBucketedScan: Boolean = false)
+ override val disableBucketedScan: Boolean = false,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
output,
@@ -71,9 +70,13 @@ case class FileSourceScanExecTransformer(
optionalNumCoalescedBuckets,
QueryPlan.normalizePredicates(dataFilters, output),
None,
- disableBucketedScan
+ disableBucketedScan,
+ pushDownFilters.map(QueryPlan.normalizePredicates(_, output))
)
}
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
FileSourceScanExecTransformer =
+ copy(pushDownFilters = Some(filters))
}
abstract class FileSourceScanExecTransformerBase(
@@ -104,22 +107,7 @@ abstract class FileSourceScanExecTransformerBase(
.genFileSourceScanTransformerMetrics(sparkContext)
.filter(m => !driverMetricsAlias.contains(m._1)) ++ driverMetricsAlias
- override def filterExprs(): Seq[Expression] = dataFiltersInScan.filter {
- expr =>
- ExpressionConverter.canReplaceWithExpressionTransformer(
- ExpressionConverter.replaceAttributeReference(expr),
- output)
- }
-
- override def dataFiltersInScan: Seq[Expression] = {
- if (SparkVersionUtil.gteSpark35) {
- dataFilters.filterNot(_.references.exists {
- attr =>
BackendsApiManager.getSparkPlanExecApiInstance.isRowIndexMetadataColumn(attr.name)
- })
- } else {
- super.dataFiltersInScan
- }
- }
+ override def scanFilters: Seq[Expression] = dataFilters
override def getMetadataColumns(): Seq[AttributeReference] = metadataColumns
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala
index e0da91dd7c..9a6e271b35 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/PushDownFilterToScan.scala
@@ -17,7 +17,7 @@
package org.apache.gluten.extension.columnar
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.{BatchScanExecTransformerBase,
FileSourceScanExecTransformer, FilterExecTransformerBase}
+import org.apache.gluten.execution.{BasicScanExecTransformer,
FilterExecTransformerBase}
import org.apache.spark.sql.catalyst.expressions.PredicateHelper
import org.apache.spark.sql.catalyst.rules.Rule
@@ -31,35 +31,15 @@ object PushDownFilterToScan extends Rule[SparkPlan] with
PredicateHelper {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case filter: FilterExecTransformerBase =>
filter.child match {
- case fileScan: FileSourceScanExecTransformer =>
- val pushDownFilters =
-
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
- splitConjunctivePredicates(filter.cond),
- fileScan)
- val newScan = fileScan.copy(dataFilters = pushDownFilters)
+ case scan: BasicScanExecTransformer
+ if
BackendsApiManager.getSparkPlanExecApiInstance.supportPushDownFilterToScan(
+ scan) && scan.supportPushDownFilters =>
+ val newScan =
scan.withNewPushdownFilters(splitConjunctivePredicates(filter.cond))
if (newScan.doValidate().ok()) {
filter.withNewChildren(Seq(newScan))
} else {
filter
}
- case batchScan: BatchScanExecTransformerBase =>
- val pushDownFilters =
-
BackendsApiManager.getSparkPlanExecApiInstance.postProcessPushDownFilter(
- splitConjunctivePredicates(filter.cond),
- batchScan)
- // If BatchScanExecTransformerBase's parent is filter,
pushdownFilters can't be None.
- batchScan.setPushDownFilters(Seq.empty)
- val newScan = batchScan
- if (pushDownFilters.nonEmpty) {
- newScan.setPushDownFilters(pushDownFilters)
- if (newScan.doValidate().ok()) {
- filter.withNewChildren(Seq(newScan))
- } else {
- filter
- }
- } else {
- filter
- }
case _ => filter
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
index 6d0b16fabb..8bf3e74ac0 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/hive/HiveTableScanExecTransformer.scala
@@ -63,7 +63,11 @@ case class HiveTableScanExecTransformer(
hiveQlTable.getOutputFormatClass,
hiveQlTable.getMetadata)
- override def filterExprs(): Seq[Expression] = Seq.empty
+ override def scanFilters: Seq[Expression] = Seq.empty
+
+ override def supportPushDownFilters: Boolean = false
+
+ override def pushDownFilters: Option[Seq[Expression]] = None
override def getMetadataColumns(): Seq[AttributeReference] = Seq.empty
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index 101d6fb342..4356467a6d 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -29,8 +29,10 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings:
String*): Unit = {
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
- case BatchScanExec(_, scan: OrcScan, _) => scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
+ case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
+ case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
}
assert(
fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 884e62d5b9..2857a4a680 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer,
FileSourceScanExecTransformerBase}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
@@ -38,7 +38,8 @@ case class TestFileSourceScanExecTransformer(
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
- override val disableBucketedScan: Boolean = false)
+ override val disableBucketedScan: Boolean = false,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
output,
@@ -82,4 +83,7 @@ case class TestFileSourceScanExecTransformer(
disableBucketedScan
)
}
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer =
+ copy(pushDownFilters = Some(filters))
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index 04a4c05b9d..76a9a6ef95 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -34,8 +34,10 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings:
String*): Unit = {
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
- case BatchScanExec(_, scan: OrcScan, _, _) => scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
+ case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
+ case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
}
assert(
fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 9f5d47b60b..6aced7cfa6 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer,
FileSourceScanExecTransformerBase}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
@@ -37,7 +37,8 @@ case class TestFileSourceScanExecTransformer(
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
- override val disableBucketedScan: Boolean = false)
+ override val disableBucketedScan: Boolean = false,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
output,
@@ -65,4 +66,7 @@ case class TestFileSourceScanExecTransformer(
getPartitions.map((_, fileFormat))
override val nodeNamePrefix: String = "TestFile"
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer =
+ copy(pushDownFilters = Some(filters))
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index 000103b903..76a9a6ef95 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -34,8 +34,10 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings:
String*): Unit = {
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
- case BatchScanExec(_, scan: OrcScan, _, _, _, _, _, _, _) =>
scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
+ case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
+ case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
}
assert(
fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 9f5d47b60b..6aced7cfa6 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer,
FileSourceScanExecTransformerBase}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
@@ -37,7 +37,8 @@ case class TestFileSourceScanExecTransformer(
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
- override val disableBucketedScan: Boolean = false)
+ override val disableBucketedScan: Boolean = false,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
output,
@@ -65,4 +66,7 @@ case class TestFileSourceScanExecTransformer(
getPartitions.map((_, fileFormat))
override val nodeNamePrefix: String = "TestFile"
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer =
+ copy(pushDownFilters = Some(filters))
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index e47deb0d70..76a9a6ef95 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -34,8 +34,10 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings:
String*): Unit = {
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
- case BatchScanExec(_, scan: OrcScan, _, _, _, _) => scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
+ case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
+ case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
}
assert(
fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index 9f5d47b60b..6aced7cfa6 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer,
FileSourceScanExecTransformerBase}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
@@ -37,7 +37,8 @@ case class TestFileSourceScanExecTransformer(
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
- override val disableBucketedScan: Boolean = false)
+ override val disableBucketedScan: Boolean = false,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
output,
@@ -65,4 +66,7 @@ case class TestFileSourceScanExecTransformer(
getPartitions.map((_, fileFormat))
override val nodeNamePrefix: String = "TestFile"
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer =
+ copy(pushDownFilters = Some(filters))
}
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
index e47deb0d70..76a9a6ef95 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/datasources/orc/GlutenOrcV2SchemaPruningSuite.scala
@@ -34,8 +34,10 @@ class GlutenOrcV2SchemaPruningSuite extends
OrcV2SchemaPruningSuite with GlutenS
override def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings:
String*): Unit = {
val fileSourceScanSchemata =
collect(df.queryExecution.executedPlan) {
- case BatchScanExec(_, scan: OrcScan, _, _, _, _) => scan.readDataSchema
- case BatchScanExecTransformer(_, scan: OrcScan, _, _, _, _, _, _, _)
=> scan.readDataSchema
+ case b: BatchScanExec if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
+ case b: BatchScanExecTransformer if b.scan.isInstanceOf[OrcScan] =>
+ b.scan.asInstanceOf[OrcScan].readDataSchema
}
assert(
fileSourceScanSchemata.size === expectedSchemaCatalogStrings.size,
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
index e805d175be..644e3c5839 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/extension/TestFileSourceScanExecTransformer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.extension
import org.apache.gluten.backendsapi.BackendsApiManager
-import org.apache.gluten.execution.FileSourceScanExecTransformerBase
+import org.apache.gluten.execution.{BasicScanExecTransformer,
FileSourceScanExecTransformerBase}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
@@ -37,7 +37,8 @@ case class TestFileSourceScanExecTransformer(
override val optionalNumCoalescedBuckets: Option[Int],
override val dataFilters: Seq[Expression],
override val tableIdentifier: Option[TableIdentifier],
- override val disableBucketedScan: Boolean = false)
+ override val disableBucketedScan: Boolean = false,
+ override val pushDownFilters: Option[Seq[Expression]] = None)
extends FileSourceScanExecTransformerBase(
relation,
output,
@@ -64,4 +65,7 @@ case class TestFileSourceScanExecTransformer(
getPartitions.map((_, fileFormat))
override val nodeNamePrefix: String = "TestFile"
+
+ override def withNewPushdownFilters(filters: Seq[Expression]):
BasicScanExecTransformer =
+ copy(pushDownFilters = Some(filters))
}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
index 234762f2f7..75049793d8 100644
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
+++
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/FileSourceScanExecShim.scala
@@ -58,10 +58,6 @@ abstract class FileSourceScanExecShim(
protected lazy val driverMetricsAlias = driverMetrics
- def dataFiltersInScan: Seq[Expression] = {
- throw new UnsupportedOperationException("Not implemented")
- }
-
def hasUnsupportedColumns: Boolean = {
// TODO, fallback if user define same name column due to we can't right now
// detect which column is metadata column which is user defined column.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]