This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 0983ec2023 [GLUTEN-7283][CORE] Support DynamicPruningExpression
conversion (#7284)
0983ec2023 is described below
commit 0983ec2023dfbc09af37f7dcb85bc5c604ef8498
Author: Zhen Wang <[email protected]>
AuthorDate: Thu Sep 26 09:24:10 2024 +0800
[GLUTEN-7283][CORE] Support DynamicPruningExpression conversion (#7284)
Closes #7283
---
.../gluten/expression/ExpressionConverter.scala | 3 +-
.../gluten/expression/ExpressionMappings.scala | 1 +
.../sql/GlutenDynamicPartitionPruningSuite.scala | 43 ++++++++++++++++++++++
.../apache/gluten/expression/ExpressionNames.scala | 1 +
4 files changed, 47 insertions(+), 1 deletion(-)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
index d14a591277..541f29fe31 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala
@@ -450,7 +450,8 @@ object ExpressionConverter extends SQLConfHelper with
Logging {
if child.dataType
.isInstanceOf[DecimalType] &&
!BackendsApiManager.getSettings.transformCheckOverflow =>
replaceWithExpressionTransformer0(child, attributeSeq, expressionsMap)
- case _: NormalizeNaNAndZero | _: PromotePrecision | _: TaggingExpression
=>
+ case _: NormalizeNaNAndZero | _: PromotePrecision | _: TaggingExpression
|
+ _: DynamicPruningExpression =>
ChildTransformer(
substraitExprName,
replaceWithExpressionTransformer0(expr.children.head, attributeSeq,
expressionsMap),
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
index e2f7971ad6..f3896cf954 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala
@@ -281,6 +281,7 @@ object ExpressionMappings {
Sig[In](IN),
Sig[InSet](IN_SET),
Sig[ScalarSubquery](SCALAR_SUBQUERY),
+ Sig[DynamicPruningExpression](DYNAMIC_PRUNING_EXPRESSION),
Sig[CheckOverflow](CHECK_OVERFLOW),
Sig[MakeDecimal](MAKE_DECIMAL),
Sig[PromotePrecision](PROMOTE_PRECISION),
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
index 559aaada4a..94a6e3bdc2 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenDynamicPartitionPruningSuite.scala
@@ -776,6 +776,49 @@ class GlutenDynamicPartitionPruningV1SuiteAEOn
checkAnswer(df, Row(1000, 1) :: Row(1010, 2) :: Row(1020, 2) :: Nil)
}
}
+
+ testGluten("Filter with DynamicPruningExpression") {
+ withTable("fact_stats_non_partition") {
+ spark
+ .table("fact_stats")
+ .write
+ .format(tableFormat)
+ .saveAsTable("fact_stats_non_partition")
+
+ withSQLConf(
+ SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
+ SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
+ val df = sql("""
+ |SELECT f.date_id, f.product_id, f.units_sold,
f.store_id FROM (
+ | select * from fact_stats
+ | union all
+ | select * from fact_stats_non_partition
+ |) f
+ |JOIN dim_stats s
+ |ON f.store_id = s.store_id WHERE s.country = 'DE'
+ """.stripMargin)
+ checkAnswer(
+ df,
+ Row(1030, 2, 10, 3) ::
+ Row(1040, 2, 50, 3) ::
+ Row(1050, 2, 50, 3) ::
+ Row(1060, 2, 50, 3) ::
+ Row(1030, 2, 10, 3) ::
+ Row(1040, 2, 50, 3) ::
+ Row(1050, 2, 50, 3) ::
+ Row(1060, 2, 50, 3) :: Nil
+ )
+ val filters = collect(df.queryExecution.executedPlan) { case f:
FilterExec => f }
+ assert(filters.isEmpty)
+ val filterTransformerWithDPPs =
collect(df.queryExecution.executedPlan) {
+ case f: FilterExecTransformerBase
+ if f.cond.exists(_.isInstanceOf[DynamicPruningExpression]) =>
+ f
+ }
+ assert(filterTransformerWithDPPs.nonEmpty)
+ }
+ }
+ }
}
abstract class GlutenDynamicPartitionPruningV2Suite extends
GlutenDynamicPartitionPruningSuiteBase {
diff --git
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
index 0e08e013cb..a4f2e7174d 100644
---
a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
+++
b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala
@@ -307,6 +307,7 @@ object ExpressionNames {
final val IN = "in"
final val IN_SET = "in_set"
final val SCALAR_SUBQUERY = "scalar_subquery"
+ final val DYNAMIC_PRUNING_EXPRESSION = "dynamic_pruning_expression"
final val AGGREGATE = "aggregate"
final val LAMBDAFUNCTION = "lambdafunction"
final val EXPLODE = "explode"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]