[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join

2022-08-09 Thread GitBox


ulysses-you commented on code in PR #37129:
URL: https://github.com/apache/spark/pull/37129#discussion_r941163219


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder}
+import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
ExtractTopK}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalLimit, 
LogicalPlan, Project, RebalancePartitions, Repartition, 
RepartitionByExpression, Sort, Union}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, 
SORT}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule supports push down local limit and local sort from TopK through 
outer join:
+ *   - for a left outer join, the references of ordering of TopK come from the 
left side and
+ * the limits of TopK is smaller than left side max rows
+ *   - for a right outer join, the references of ordering of TopK come from 
the right side and
+ * the limits of TopK is smaller than right side max rows
+ *
+ * Note that, this rule only push down local topK to the bottom outer join 
which is different with
+ * [[LimitPushDown]]. This is to avoid regression due to the overhead of local 
sort.

Review Comment:
   if the children are not shuffles, how about estimating the `minRowCount` and 
only push down if the limits of topk is much smaller than the `minRowCount` ? 
e.g. the `minRowCount` of left outer join should be the left side. I think it 
should be almost no perf regression corner cases if we can acutally save a lot 
of data size before join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join

2022-08-09 Thread GitBox


ulysses-you commented on code in PR #37129:
URL: https://github.com/apache/spark/pull/37129#discussion_r941137063


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala:
##
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder}
+import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
ExtractTopK}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.{Join, LocalLimit, 
LogicalPlan, Project, RebalancePartitions, Repartition, 
RepartitionByExpression, Sort, Union}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, 
SORT}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule supports push down local limit and local sort from TopK through 
outer join:
+ *   - for a left outer join, the references of ordering of TopK come from the 
left side and
+ * the limits of TopK is smaller than left side max rows
+ *   - for a right outer join, the references of ordering of TopK come from 
the right side and
+ * the limits of TopK is smaller than right side max rows
+ *
+ * Note that, this rule only push down local topK to the bottom outer join 
which is different with
+ * [[LimitPushDown]]. This is to avoid regression due to the overhead of local 
sort.

Review Comment:
   yes, it will hit small regression if the acutal row count is smaller than 
the configured threshold. At this case we do a one more local sort but can not 
reduce the data size. So we can avoid regression by configuring a small enough 
threshold.
   
   Or to be more conservative, we can put this rule to AQE Optimizer. Then we 
can decide whether do push down according to the accurate row count.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join

2022-07-12 Thread GitBox


ulysses-you commented on code in PR #37129:
URL: https://github.com/apache/spark/pull/37129#discussion_r918870960


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder}
+import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
ExtractTopK}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LocalLimit, 
LogicalPlan, Project, RebalancePartitions, Repartition, 
RepartitionByExpression, Sort, Union}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, 
SORT}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule supports push down local limit and local sort from TopK through 
other join:
+ *   - for a left outer join, the references of ordering of TopK come from the 
left side and
+ * the limits of TopK is smaller than left side max rows
+ *   - for a right outer join, the references of ordering of TopK come from 
the right side and
+ * the limits of TopK is smaller than right side max rows
+ */
+object PushLocalTopKThroughOuterJoin extends Rule[LogicalPlan] {
+  private def smallThan(limits: Int, maxRowsOpt: Option[Long]): Boolean = 
maxRowsOpt match {
+case Some(maxRows) => limits < maxRows
+case _ => true

Review Comment:
   yes, that's why I added the new config to forbid push down the big limit, 
then the worst case will be one more local sort with the limitation number that 
almost no affects.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join

2022-07-12 Thread GitBox


ulysses-you commented on code in PR #37129:
URL: https://github.com/apache/spark/pull/37129#discussion_r918865962


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -230,6 +230,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // non-nullable when an empty relation child of a Union is removed
   UpdateAttributeNullability) :+
 Batch("Optimize One Row Plan", fixedPoint, OptimizeOneRowPlan) :+
+Batch("Push Local TopK Through Outer Join", FixedPoint(1), 
PushLocalTopKThroughOuterJoin) :+

Review Comment:
   acutally, it should be `FixedPoint(1)`. we only push local TopK to the 
bottom outer join so the idempotent will always be broken if there are one more 
outer joins match this rule. I leave some comments for this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join

2022-07-12 Thread GitBox


ulysses-you commented on code in PR #37129:
URL: https://github.com/apache/spark/pull/37129#discussion_r918867434


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder}
+import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
ExtractTopK}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LocalLimit, 
LogicalPlan, Project, RebalancePartitions, Repartition, 
RepartitionByExpression, Sort, Union}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, 
SORT}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule supports push down local limit and local sort from TopK through 
other join:
+ *   - for a left outer join, the references of ordering of TopK come from the 
left side and
+ * the limits of TopK is smaller than left side max rows
+ *   - for a right outer join, the references of ordering of TopK come from 
the right side and
+ * the limits of TopK is smaller than right side max rows
+ */
+object PushLocalTopKThroughOuterJoin extends Rule[LogicalPlan] {
+  private def smallThan(limits: Int, maxRowsOpt: Option[Long]): Boolean = 
maxRowsOpt match {
+case Some(maxRows) => limits < maxRows
+case _ => true
+  }
+
+  private def canPushThroughOuterJoin(
+  joinType: JoinType,
+  order: Seq[SortOrder],
+  leftChild: LogicalPlan,
+  rightChild: LogicalPlan,
+  limits: Int): Boolean = joinType match {
+case LeftOuter =>
+  order.forall(_.references.subsetOf(leftChild.outputSet)) &&
+smallThan(limits, leftChild.maxRowsPerPartition)
+case RightOuter =>
+  order.forall(_.references.subsetOf(rightChild.outputSet)) &&
+smallThan(limits, rightChild.maxRowsPerPartition)
+case _ => false
+  }
+
+  private def findOuterJoin(limits: Int, order: Seq[SortOrder], child: 
LogicalPlan): Seq[Join] = {
+child match {
+  case j @ ExtractEquiJoinKeys(joinType, _, _, _, _, leftChild, 
rightChild, _)
+  if canPushThroughOuterJoin(joinType, order, leftChild, rightChild, 
limits) =>
+// we should find the bottom outer join which can push local topK
+val childOuterJoin = joinType match {
+  case LeftOuter => findOuterJoin(limits, order, leftChild)
+  case RightOuter => findOuterJoin(limits, order, rightChild)
+  case _ => Seq.empty
+}
+if (childOuterJoin.nonEmpty) {
+  childOuterJoin
+} else {
+  j :: Nil
+}
+  case u: Union => u.children.flatMap(child => findOuterJoin(limits, 
order, child))
+  case p: Project if p.projectList.forall(_.deterministic) =>
+findOuterJoin(limits, order, p.child)
+  case f: Filter if f.condition.deterministic =>
+findOuterJoin(limits, order, f.child)

Review Comment:
   good catch.. some how I put filter here. removed it.
   
   But this rule still can push through filter thanks to the rule 
`PushPredicateThroughJoin`. I added some tests for this. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #37129: [SPARK-39710][SQL] Support push local topK through outer join

2022-07-12 Thread GitBox


ulysses-you commented on code in PR #37129:
URL: https://github.com/apache/spark/pull/37129#discussion_r918864202


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushLocalTopKThroughOuterJoin.scala:
##
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions.{Literal, SortOrder}
+import org.apache.spark.sql.catalyst.planning.{ExtractEquiJoinKeys, 
ExtractTopK}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LocalLimit, 
LogicalPlan, Project, RebalancePartitions, Repartition, 
RepartitionByExpression, Sort, Union}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.catalyst.trees.TreePattern.{LIMIT, OUTER_JOIN, 
SORT}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule supports push down local limit and local sort from TopK through 
other join:
+ *   - for a left outer join, the references of ordering of TopK come from the 
left side and
+ * the limits of TopK is smaller than left side max rows
+ *   - for a right outer join, the references of ordering of TopK come from 
the right side and
+ * the limits of TopK is smaller than right side max rows
+ */
+object PushLocalTopKThroughOuterJoin extends Rule[LogicalPlan] {
+  private def smallThan(limits: Int, maxRowsOpt: Option[Long]): Boolean = 
maxRowsOpt match {
+case Some(maxRows) => limits < maxRows
+case _ => true
+  }
+
+  private def canPushThroughOuterJoin(
+  joinType: JoinType,
+  order: Seq[SortOrder],
+  leftChild: LogicalPlan,
+  rightChild: LogicalPlan,
+  limits: Int): Boolean = joinType match {
+case LeftOuter =>
+  order.forall(_.references.subsetOf(leftChild.outputSet)) &&
+smallThan(limits, leftChild.maxRowsPerPartition)
+case RightOuter =>
+  order.forall(_.references.subsetOf(rightChild.outputSet)) &&
+smallThan(limits, rightChild.maxRowsPerPartition)
+case _ => false
+  }
+
+  private def findOuterJoin(limits: Int, order: Seq[SortOrder], child: 
LogicalPlan): Seq[Join] = {
+child match {
+  case j @ ExtractEquiJoinKeys(joinType, _, _, _, _, leftChild, 
rightChild, _)
+  if canPushThroughOuterJoin(joinType, order, leftChild, rightChild, 
limits) =>
+// we should find the bottom outer join which can push local topK
+val childOuterJoin = joinType match {
+  case LeftOuter => findOuterJoin(limits, order, leftChild)
+  case RightOuter => findOuterJoin(limits, order, rightChild)
+  case _ => Seq.empty
+}
+if (childOuterJoin.nonEmpty) {
+  childOuterJoin
+} else {
+  j :: Nil
+}
+  case u: Union => u.children.flatMap(child => findOuterJoin(limits, 
order, child))
+  case p: Project if p.projectList.forall(_.deterministic) =>
+findOuterJoin(limits, order, p.child)
+  case f: Filter if f.condition.deterministic =>
+findOuterJoin(limits, order, f.child)
+  case r: RepartitionByExpression if 
r.partitionExpressions.forall(_.deterministic) =>
+findOuterJoin(limits, order, r.child)
+  case r: RebalancePartitions if 
r.partitionExpressions.forall(_.deterministic) =>
+findOuterJoin(limits, order, r.child)
+  case r: Repartition => findOuterJoin(limits, order, r.child)
+  case _ => Seq.empty
+}
+  }
+
+  private def pushLocalTopK(limits: Int, order: Seq[SortOrder], join: Join): 
Join = {
+val (newLeft, newRight) = join.joinType match {
+  case LeftOuter =>
+(LocalLimit(Literal(limits), Sort(order, false, join.left)), 
join.right)

Review Comment:
   this new rule is not simple enough, so I prefer to be independent from 
LimitPushDown. have other concern about merging into LimitPushDown ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact I