This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 78700d939c4 [SPARK-38797][SQL] Runtime Filter supports pruning side has window 78700d939c4 is described below commit 78700d939c42404ce6bd420094e13a258875949b Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Thu Apr 14 08:39:15 2022 +0800 [SPARK-38797][SQL] Runtime Filter supports pruning side has window ### What changes were proposed in this pull request? 1. Makes row-level runtime filtering support pruning side has window. For example: ```sql SELECT * FROM (SELECT *, Row_number() OVER ( partition BY c1 ORDER BY f1) rn FROM bf1) bf1 JOIN bf2 ON bf1.c1 = bf2.c2 WHERE bf2.a2 = 62 ``` After this PR: ``` == Optimized Logical Plan == Join Inner, (c1#45922 = c2#45928), Statistics(sizeInBytes=12.3 MiB) :- Window [row_number() windowspecdefinition(c1#45922, f1#45925 ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rn#45976], [c1#45922], [f1#45925 ASC NULLS FIRST], Statistics(sizeInBytes=3.7 KiB) : +- Filter (isnotnull(c1#45922) AND might_contain(scalar-subquery#45993 [], xxhash64(c1#45922, 42))), Statistics(sizeInBytes=3.3 KiB) : : +- Aggregate [bloom_filter_agg(xxhash64(c2#45928, 42), 1000000, 8388608, 0, 0) AS bloomFilter#45992], Statistics(sizeInBytes=108.0 B, rowCount=1) : : +- Project [c2#45928], Statistics(sizeInBytes=1278.0 B) : : +- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB) : : +- Relation default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet, Statistics(sizeInBytes=3.3 KiB) : +- Relation default.bf1[a1#45920,b1#45921,c1#45922,d1#45923,e1#45924,f1#45925] parquet, Statistics(sizeInBytes=3.3 KiB) +- Filter ((isnotnull(a2#45926) AND (a2#45926 = 62)) AND isnotnull(c2#45928)), Statistics(sizeInBytes=3.3 KiB) +- Relation default.bf2[a2#45926,b2#45927,c2#45928,d2#45929,e2#45930,f2#45931] parquet, Statistics(sizeInBytes=3.3 KiB) ``` 2. Make sure injected filters could push through Shuffle if current join is a broadcast join. ### Why are the changes needed? Improve query performance. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #36080 from wangyum/SPARK-38797. Lead-authored-by: Yuming Wang <yumw...@ebay.com> Co-authored-by: Yuming Wang <wgy...@gmail.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../catalyst/optimizer/InjectRuntimeFilter.scala | 5 +++-- .../spark/sql/InjectRuntimeFilterSuite.scala | 26 ++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala index 134292ae30d..01c1786e05a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala @@ -141,6 +141,7 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J plan.exists { case Join(left, right, _, _, hint) => isProbablyShuffleJoin(left, right, hint) case _: Aggregate => true + case _: Window => true case _ => false } } @@ -172,8 +173,8 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J /** * Check that: - * - The filterApplicationSideJoinExp can be pushed down through joins and aggregates (ie the - * expression references originate from a single leaf node) + * - The filterApplicationSideJoinExp can be pushed down through joins, aggregates and windows + * (ie the expression references originate from a single leaf node) * - The filter creation side has a selective predicate * - The current join is a shuffle join or a broadcast join that has a shuffle below it * - The max filterApplicationSide scan size is greater than a configurable threshold diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index 726fa341b5c..6065f232109 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -539,4 +539,30 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp """.stripMargin) } } + + test("Runtime Filter supports pruning side has Aggregate") { + withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000") { + assertRewroteWithBloomFilter( + """ + |SELECT * + |FROM (SELECT c1 AS aliased_c1, d1 FROM bf1 GROUP BY c1, d1) bf1 + | JOIN bf2 ON bf1.aliased_c1 = bf2.c2 + |WHERE bf2.a2 = 62 + """.stripMargin) + } + } + + test("Runtime Filter supports pruning side has Window") { + withSQLConf(SQLConf.RUNTIME_BLOOM_FILTER_APPLICATION_SIDE_SCAN_SIZE_THRESHOLD.key -> "3000") { + assertRewroteWithBloomFilter( + """ + |SELECT * + |FROM (SELECT *, + | Row_number() OVER (PARTITION BY c1 ORDER BY f1) rn + | FROM bf1) bf1 + | JOIN bf2 ON bf1.c1 = bf2.c2 + |WHERE bf2.a2 = 62 + """.stripMargin) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org