[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join
ulysses-you commented on code in PR #37129: URL: https://github.com/apache/spark/pull/37129#discussion_r941163219 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder} +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractTopK} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalLimit, LogicalPlan, Project, RebalancePartitions, Repartition, RepartitionByExpression, Sort, Union} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, SORT} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule supports push down local limit and local sort from TopK through outer join: + * - for a left outer join, the references of ordering of TopK come from the left side and + * the limits of TopK is smaller than left side max rows + * - for a right outer join, the references of ordering of TopK come from the right side and + * the limits of TopK is smaller than right side max rows + * + * Note that, this rule only push down local topK to the bottom outer join which is different with + * [[LimitPushDown]]. This is to avoid regression due to the overhead of local sort. Review Comment: if the children are not shuffles, how about estimating the `minRowCount` and only push down if the limits of topk is much smaller than the `minRowCount` ? e.g. the `minRowCount` of left outer join should be the left side. I think it should be almost no perf regression corner cases if we can acutally save a lot of data size before join. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join
ulysses-you commented on code in PR #37129: URL: https://github.com/apache/spark/pull/37129#discussion_r941137063 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala: ## @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder} +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractTopK} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalLimit, LogicalPlan, Project, RebalancePartitions, Repartition, RepartitionByExpression, Sort, Union} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, SORT} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule supports push down local limit and local sort from TopK through outer join: + * - for a left outer join, the references of ordering of TopK come from the left side and + * the limits of TopK is smaller than left side max rows + * - for a right outer join, the references of ordering of TopK come from the right side and + * the limits of TopK is smaller than right side max rows + * + * Note that, this rule only push down local topK to the bottom outer join which is different with + * [[LimitPushDown]]. This is to avoid regression due to the overhead of local sort. Review Comment: yes, it will hit small regression if the acutal row count is smaller than the configured threshold. At this case we do a one more local sort but can not reduce the data size. So we can avoid regression by configuring a small enough threshold. Or to be more conservative, we can put this rule to AQE Optimizer. Then we can decide whether do push down according to the accurate row count. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join
ulysses-you commented on code in PR #37129: URL: https://github.com/apache/spark/pull/37129#discussion_r918870960 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder} +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractTopK} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LocalLimit, LogicalPlan, Project, RebalancePartitions, Repartition, RepartitionByExpression, Sort, Union} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, SORT} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule supports push down local limit and local sort from TopK through other join: + * - for a left outer join, the references of ordering of TopK come from the left side and + * the limits of TopK is smaller than left side max rows + * - for a right outer join, the references of ordering of TopK come from the right side and + * the limits of TopK is smaller than right side max rows + */ +object PushLocalTopKThroughOuterJoin extends Rule[LogicalPlan] { + private def smallThan(limits: Int, maxRowsOpt: Option[Long]): Boolean = maxRowsOpt match { +case Some(maxRows) => limits < maxRows +case _ => true Review Comment: yes, that's why I added the new config to forbid push down the big limit, then the worst case will be one more local sort with the limitation number that almost no affects. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join
ulysses-you commented on code in PR #37129: URL: https://github.com/apache/spark/pull/37129#discussion_r918865962 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -230,6 +230,7 @@ abstract class Optimizer(catalogManager: CatalogManager) // non-nullable when an empty relation child of a Union is removed UpdateAttributeNullability) :+ Batch("Optimize One Row Plan", fixedPoint, OptimizeOneRowPlan) :+ +Batch("Push Local TopK Through Outer Join", FixedPoint(1), PushLocalTopKThroughOuterJoin) :+ Review Comment: acutally, it should be `FixedPoint(1)`. we only push local TopK to the bottom outer join so the idempotent will always be broken if there are one more outer joins match this rule. I leave some comments for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join
ulysses-you commented on code in PR #37129: URL: https://github.com/apache/spark/pull/37129#discussion_r918867434 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder} +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractTopK} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LocalLimit, LogicalPlan, Project, RebalancePartitions, Repartition, RepartitionByExpression, Sort, Union} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, SORT} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule supports push down local limit and local sort from TopK through other join: + * - for a left outer join, the references of ordering of TopK come from the left side and + * the limits of TopK is smaller than left side max rows + * - for a right outer join, the references of ordering of TopK come from the right side and + * the limits of TopK is smaller than right side max rows + */ +object PushLocalTopKThroughOuterJoin extends Rule[LogicalPlan] { + private def smallThan(limits: Int, maxRowsOpt: Option[Long]): Boolean = maxRowsOpt match { +case Some(maxRows) => limits < maxRows +case _ => true + } + + private def canPushThroughOuterJoin( + joinType: JoinType, + order: Seq[SortOrder], + leftChild: LogicalPlan, + rightChild: LogicalPlan, + limits: Int): Boolean = joinType match { +case LeftOuter => + order.forall(_.references.subsetOf(leftChild.outputSet)) && +smallThan(limits, leftChild.maxRowsPerPartition) +case RightOuter => + order.forall(_.references.subsetOf(rightChild.outputSet)) && +smallThan(limits, rightChild.maxRowsPerPartition) +case _ => false + } + + private def findOuterJoin(limits: Int, order: Seq[SortOrder], child: LogicalPlan): Seq[Join] = { +child match { + case j @ ExtractEquiJoinKeys(joinType, _, _, _, _, leftChild, rightChild, _) + if canPushThroughOuterJoin(joinType, order, leftChild, rightChild, limits) => +// we should find the bottom outer join which can push local topK +val childOuterJoin = joinType match { + case LeftOuter => findOuterJoin(limits, order, leftChild) + case RightOuter => findOuterJoin(limits, order, rightChild) + case _ => Seq.empty +} +if (childOuterJoin.nonEmpty) { + childOuterJoin +} else { + j :: Nil +} + case u: Union => u.children.flatMap(child => findOuterJoin(limits, order, child)) + case p: Project if p.projectList.forall(_.deterministic) => +findOuterJoin(limits, order, p.child) + case f: Filter if f.condition.deterministic => +findOuterJoin(limits, order, f.child) Review Comment: good catch.. some how I put filter here. removed it. But this rule still can push through filter thanks to the rule `PushPredicateThroughJoin`. I added some tests for this. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join
ulysses-you commented on code in PR #37129: URL: https://github.com/apache/spark/pull/37129#discussion_r918864202 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala: ## @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder} +import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, ExtractTopK} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LocalLimit, LogicalPlan, Project, RebalancePartitions, Repartition, RepartitionByExpression, Sort, Union} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, SORT} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule supports push down local limit and local sort from TopK through other join: + * - for a left outer join, the references of ordering of TopK come from the left side and + * the limits of TopK is smaller than left side max rows + * - for a right outer join, the references of ordering of TopK come from the right side and + * the limits of TopK is smaller than right side max rows + */ +object PushLocalTopKThroughOuterJoin extends Rule[LogicalPlan] { + private def smallThan(limits: Int, maxRowsOpt: Option[Long]): Boolean = maxRowsOpt match { +case Some(maxRows) => limits < maxRows +case _ => true + } + + private def canPushThroughOuterJoin( + joinType: JoinType, + order: Seq[SortOrder], + leftChild: LogicalPlan, + rightChild: LogicalPlan, + limits: Int): Boolean = joinType match { +case LeftOuter => + order.forall(_.references.subsetOf(leftChild.outputSet)) && +smallThan(limits, leftChild.maxRowsPerPartition) +case RightOuter => + order.forall(_.references.subsetOf(rightChild.outputSet)) && +smallThan(limits, rightChild.maxRowsPerPartition) +case _ => false + } + + private def findOuterJoin(limits: Int, order: Seq[SortOrder], child: LogicalPlan): Seq[Join] = { +child match { + case j @ ExtractEquiJoinKeys(joinType, _, _, _, _, leftChild, rightChild, _) + if canPushThroughOuterJoin(joinType, order, leftChild, rightChild, limits) => +// we should find the bottom outer join which can push local topK +val childOuterJoin = joinType match { + case LeftOuter => findOuterJoin(limits, order, leftChild) + case RightOuter => findOuterJoin(limits, order, rightChild) + case _ => Seq.empty +} +if (childOuterJoin.nonEmpty) { + childOuterJoin +} else { + j :: Nil +} + case u: Union => u.children.flatMap(child => findOuterJoin(limits, order, child)) + case p: Project if p.projectList.forall(_.deterministic) => +findOuterJoin(limits, order, p.child) + case f: Filter if f.condition.deterministic => +findOuterJoin(limits, order, f.child) + case r: RepartitionByExpression if r.partitionExpressions.forall(_.deterministic) => +findOuterJoin(limits, order, r.child) + case r: RebalancePartitions if r.partitionExpressions.forall(_.deterministic) => +findOuterJoin(limits, order, r.child) + case r: Repartition => findOuterJoin(limits, order, r.child) + case _ => Seq.empty +} + } + + private def pushLocalTopK(limits: Int, order: Seq[SortOrder], join: Join): Join = { +val (newLeft, newRight) = join.joinType match { + case LeftOuter => +(LocalLimit(Literal(limits), Sort(order, false, join.left)), join.right) Review Comment: this new rule is not simple enough, so I prefer to be independent from LimitPushDown. have other concern about merging into LimitPushDown ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact I