This is an automated email from the ASF dual-hosted git repository. yumwang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0cc732a262e [SPARK-39511][SQL] Enhance push down local limit 1 for right side of left semi/anti join if join condition is empty 0cc732a262e is described below commit 0cc732a262ee3fe504bdad57b077d23a1a5d2287 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Fri Jun 24 11:14:28 2022 +0800 [SPARK-39511][SQL] Enhance push down local limit 1 for right side of left semi/anti join if join condition is empty ### What changes were proposed in this pull request? This PR enhances https://github.com/apache/spark/pull/35216 to support more cases. Before this PR it only supports left semi/anti join followed by limit: ```sql SELECT * FROM t1 LEFT SEMI JOIN t2 LIMIT 10; SELECT * FROM t1 LEFT ANTI JOIN t2 LIMIT 10; ``` After this PR it do not have this limitation and also support in / not in subquery: ```sql SELECT * FROM t1 LEFT SEMI JOIN t2; SELECT * FROM t1 LEFT ANTI JOIN t2; SELECT * FROM v1 WHERE literal IN (SELECT id FROM t2); SELECT * FROM v1 WHERE literal NOT IN (SELECT id FROM t2); ``` ### Why are the changes needed? Improve query performance. For example: ```sql CREATE TABLE t1(id int) using parquet; CREATE TABLE t2(id int, type string) using parquet; CREATE TEMP VIEW v1 AS SELECT id, 't' AS type FROM t1; EXPLAIN EXTENDED SELECT * FROM v1 WHERE type IN (SELECT type FROM t2); ``` Before this PR: ``` === Result of Batch RewriteSubquery === Project [id#241, t AS type#246] Project [id#241, t AS type#246] !+- Filter t IN (list#243 []) +- Join LeftSemi, (t = type#248) ! : +- Project [type#248] :- Relation default.t1[id#241] parquet ! : +- Relation default.t2[id#247,type#248] parquet +- Project [type#248] ! +- Relation default.t1[id#241] parquet +- Relation default.t2[id#247,type#248] parquet ``` After this PR: ``` === Result of Batch RewriteSubquery === Project [id#241, t AS type#246] Project [id#241, t AS type#246] !+- Filter t IN (list#243 []) +- Join LeftSemi ! : +- Project [type#248] :- Relation default.t1[id#241] parquet ! : +- Relation default.t2[id#247,type#248] parquet +- GlobalLimit 1 ! +- Relation default.t1[id#241] parquet +- LocalLimit 1 ! +- Project ! +- Filter (t = type#248) ! +- Relation default.t2[id#247,type#248] parquet ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #36909 from wangyum/SPARK-39511. Lead-authored-by: Yuming Wang <yumw...@ebay.com> Co-authored-by: Yuming Wang <wgy...@gmail.com> Signed-off-by: Yuming Wang <yumw...@ebay.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 11 ++++++---- .../catalyst/optimizer/LimitPushdownSuite.scala | 24 +++++++++++++++++++++- .../scala/org/apache/spark/sql/SubquerySuite.scala | 18 +++++++++++++++- 3 files changed, 47 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 4304f475655..eda42a9adbe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -235,6 +235,8 @@ abstract class Optimizer(catalogManager: CatalogManager) CheckCartesianProducts) :+ Batch("RewriteSubquery", Once, RewritePredicateSubquery, + PushPredicateThroughJoin, + LimitPushDown, ColumnPruning, CollapseProject, RemoveRedundantAliases, @@ -710,15 +712,13 @@ object LimitPushDown extends Rule[LogicalPlan] { left = maybePushLocalLimit(limitExpr, join.left), right = maybePushLocalLimit(limitExpr, join.right)) case LeftSemi | LeftAnti if join.condition.isEmpty => - join.copy( - left = maybePushLocalLimit(limitExpr, join.left), - right = maybePushLocalLimit(Literal(1, IntegerType), join.right)) + join.copy(left = maybePushLocalLimit(limitExpr, join.left)) case _ => join } } def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( - _.containsPattern(LIMIT), ruleId) { + _.containsAnyPattern(LIMIT, LEFT_SEMI_OR_ANTI_JOIN), ruleId) { // Adding extra Limits below UNION ALL for children which are not Limit or do not have Limit // descendants whose maxRow is larger. This heuristic is valid assuming there does not exist any // Limit push-down rule that is unable to infer the value of maxRows. @@ -753,6 +753,9 @@ object LimitPushDown extends Rule[LogicalPlan] { // Merge offset value and limit value into LocalLimit and pushes down LocalLimit through Offset. case LocalLimit(le, Offset(oe, grandChild)) => Offset(oe, LocalLimit(Add(le, oe), grandChild)) + // Push down local limit 1 if join type is LeftSemiOrAnti and join condition is empty. + case j @ Join(_, right, LeftSemiOrAnti(_), None, _) if !right.maxRows.exists(_ <= 1) => + j.copy(right = maybePushLocalLimit(Literal(1, IntegerType), right)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index 1f19ac77c94..9c093bda263 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions.Add -import org.apache.spark.sql.catalyst.plans.{Cross, FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, PlanTest, RightOuter} +import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ @@ -276,4 +276,26 @@ class LimitPushdownSuite extends PlanTest { Optimize.execute(testRelation.offset(2).limit(1).analyze), GlobalLimit(1, Offset(2, LocalLimit(3, testRelation))).analyze) } + + test("SPARK-39511: Push limit 1 to right side if join type is LeftSemiOrAnti") { + Seq(LeftSemi, LeftAnti).foreach { joinType => + comparePlans( + Optimize.execute(x.join(y, joinType).analyze), + x.join(LocalLimit(1, y), joinType).analyze) + } + + Seq(LeftSemi, LeftAnti).foreach { joinType => + comparePlans( + Optimize.execute(x.join(y.limit(2), joinType).analyze), + x.join(LocalLimit(1, y), joinType).analyze) + } + + Seq(LeftSemi, LeftAnti).foreach { joinType => + val originalQuery1 = x.join(LocalLimit(1, y), joinType).analyze + val originalQuery2 = x.join(y.limit(1), joinType).analyze + + comparePlans(Optimize.execute(originalQuery1), originalQuery1) + comparePlans(Optimize.execute(originalQuery2), originalQuery2) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index b1057fd14bc..fa24e8d175b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Sort} -import org.apache.spark.sql.execution.{ColumnarToRowExec, ExecSubqueryExpression, FileSourceScanExec, InputAdapter, ReusedSubqueryExec, ScalarSubquery, SubqueryExec, WholeStageCodegenExec} +import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecution} import org.apache.spark.sql.execution.datasources.FileScanRDD import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec @@ -2204,4 +2204,20 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark |""".stripMargin), Row("2022-06-01")) } + + test("SPARK-39511: Push limit 1 to right side if join type is Left Semi/Anti") { + withTable("t1", "t2") { + withTempView("v1") { + spark.sql("CREATE TABLE t1(id int) using parquet") + spark.sql("CREATE TABLE t2(id int, type string) using parquet") + spark.sql("CREATE TEMP VIEW v1 AS SELECT id, 't' AS type FROM t1") + val df = spark.sql("SELECT * FROM v1 WHERE type IN (SELECT type FROM t2)") + val join = + df.queryExecution.sparkPlan.collectFirst { case b: BroadcastNestedLoopJoinExec => b } + assert(join.nonEmpty) + assert(join.head.right.isInstanceOf[LocalLimitExec]) + assert(join.head.right.asInstanceOf[LocalLimitExec].limit === 1) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org