[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198365370
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!sameJoinOrder(orderedJoins, originalPlan)) {
+  if (originalPlan.output != orderedJoins.output) {
--- End diff --

I think opening another JIRA would be better, for people discovering this 
bug in earlier versions are more likely to find it if it is filed as an 
independent issue. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198347568
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,51 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case Filter(_, child) => extractLeftDeepInnerJoins(child)
+case Project(_, child) => extractLeftDeepInnerJoins(child)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!sameJoinOrder(orderedJoins, originalPlan)) {
+  if (originalPlan.output != orderedJoins.output) {
+// Keep the same output attributes and the order
+Project(originalPlan.output, orderedJoins)
+  } else {
+orderedJoins
+  }
+} else {
+  originalPlan
+}
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case ExtractFiltersAndInnerJoins(input, conditions)
+case p @ ExtractFiltersAndInnerJoins(input, conditions)
 if input.size > 2 && conditions.nonEmpty =>
   if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) {
 val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, 
conditions)
 if (starJoinPlan.nonEmpty) {
   val rest = input.filterNot(starJoinPlan.contains(_))
-  createOrderedJoin(starJoinPlan ++ rest, conditions)
+  mayCreateOrderedJoin(p, starJoinPlan ++ rest, conditions)
 } else {
-  createOrderedJoin(input, conditions)
+  mayCreateOrderedJoin(p, input, conditions)
 }
   } else {
-createOrderedJoin(input, conditions)
+mayCreateOrderedJoin(p, input, conditions)
   }
--- End diff --

I expanded the function logic in the end of `apply` because the logic is 
enough small: 
https://github.com/apache/spark/pull/20345/files#diff-17d31b198ff391188311550fcabd1198R119


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198345839
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!sameJoinOrder(orderedJoins, originalPlan)) {
+  if (originalPlan.output != orderedJoins.output) {
--- End diff --

ah, yes, I got you. btw, we need to file a separate jira? Is it bad to add 
the bug in the description of this jira and this pr?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198341155
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,51 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case Filter(_, child) => extractLeftDeepInnerJoins(child)
+case Project(_, child) => extractLeftDeepInnerJoins(child)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!sameJoinOrder(orderedJoins, originalPlan)) {
+  if (originalPlan.output != orderedJoins.output) {
+// Keep the same output attributes and the order
+Project(originalPlan.output, orderedJoins)
+  } else {
+orderedJoins
+  }
+} else {
+  originalPlan
+}
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case ExtractFiltersAndInnerJoins(input, conditions)
+case p @ ExtractFiltersAndInnerJoins(input, conditions)
 if input.size > 2 && conditions.nonEmpty =>
   if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) {
 val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, 
conditions)
 if (starJoinPlan.nonEmpty) {
   val rest = input.filterNot(starJoinPlan.contains(_))
-  createOrderedJoin(starJoinPlan ++ rest, conditions)
+  mayCreateOrderedJoin(p, starJoinPlan ++ rest, conditions)
 } else {
-  createOrderedJoin(input, conditions)
+  mayCreateOrderedJoin(p, input, conditions)
 }
   } else {
-createOrderedJoin(input, conditions)
+mayCreateOrderedJoin(p, input, conditions)
   }
--- End diff --

ok, I'll brush up the code based on the suggestion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198209435
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!sameJoinOrder(orderedJoins, originalPlan)) {
+  if (originalPlan.output != orderedJoins.output) {
--- End diff --

I mean the original join re-order before your fix did not apply a Project 
and that must be a bug, as is illustrated by your change to JoinReorderSuite. I 
think we should file it and mark it fixed by this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198208026
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,51 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case Filter(_, child) => extractLeftDeepInnerJoins(child)
+case Project(_, child) => extractLeftDeepInnerJoins(child)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!sameJoinOrder(orderedJoins, originalPlan)) {
+  if (originalPlan.output != orderedJoins.output) {
+// Keep the same output attributes and the order
+Project(originalPlan.output, orderedJoins)
+  } else {
+orderedJoins
+  }
+} else {
+  originalPlan
+}
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-case ExtractFiltersAndInnerJoins(input, conditions)
+case p @ ExtractFiltersAndInnerJoins(input, conditions)
 if input.size > 2 && conditions.nonEmpty =>
   if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) {
 val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, 
conditions)
 if (starJoinPlan.nonEmpty) {
   val rest = input.filterNot(starJoinPlan.contains(_))
-  createOrderedJoin(starJoinPlan ++ rest, conditions)
+  mayCreateOrderedJoin(p, starJoinPlan ++ rest, conditions)
 } else {
-  createOrderedJoin(input, conditions)
+  mayCreateOrderedJoin(p, input, conditions)
 }
   } else {
-createOrderedJoin(input, conditions)
+mayCreateOrderedJoin(p, input, conditions)
   }
--- End diff --

How about make it like:
```
   val joinReorderedPlan = if (SQLConf.get.starSchemaDetection && 
!SQLConf.get.cboEnabled) {
 val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, 
conditions)
 if (starJoinPlan.nonEmpty) {
   val rest = input.filterNot(starJoinPlan.contains(_))
   createOrderedJoin(starJoinPlan ++ rest, conditions)
 } else {
   createOrderedJoin(input, conditions)
 }
   } else {
 createOrderedJoin(input, conditions)
   }
   projectIfNecessary(joinReorderedPlan, p)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198206001
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
--- End diff --

Sorry, I didn't make myself very clear. I was suggesting something like 
"projectIfNecessary(joinReorderedPlan: LogicalPlan, originalPlan: LogicalPlan)" 
Please also refer to the comment below.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198059157
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 }
 
 /**
- * A pattern that collects the filter and inner joins.
+ * A pattern that collects the filter and inner joins and skip projections 
with attributes only.
  *
  *  Filter
  *|
  *inner Join
  *  /\>  (Seq(plan0, plan1, plan2), 
conditions)
  *  Filter   plan2
  *|
+ *  Project
--- End diff --

I fixed the bug you pointed out and added the two tests for that. Could you 
check again? Thanks!

https://github.com/apache/spark/pull/20345/files#diff-6c81baa433aea4741c60c00734f31406R191


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198052416
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 }
 
 /**
- * A pattern that collects the filter and inner joins.
+ * A pattern that collects the filter and inner joins and skip projections 
with attributes only.
  *
  *  Filter
  *|
  *inner Join
  *  /\>  (Seq(plan0, plan1, plan2), 
conditions)
  *  Filter   plan2
  *|
+ *  Project
--- End diff --

yea, good catch. Thanks! I'll fix soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198043685
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!sameJoinOrder(orderedJoins, originalPlan)) {
+  if (originalPlan.output != orderedJoins.output) {
--- End diff --

What's the bug you pointed out here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198043470
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -81,14 +92,14 @@ class JoinOptimizationSuite extends PlanTest {
 testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
   Some((Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr
 
-testExtractCheckCross(x.join(y, Cross), Some((Seq((x, Cross), (y, 
Cross)), Seq(
-testExtractCheckCross(x.join(y, Cross).join(z, Cross),
+testExtractInnerJoins(x.join(y, Cross), Some((Seq((x, Cross), (y, 
Cross)), Seq(
+testExtractInnerJoins(x.join(y, Cross).join(z, Cross),
   Some((Seq((x, Cross), (y, Cross), (z, Cross)), Seq(
-testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === 
"y.d".attr)).join(z, Cross),
+testExtractInnerJoins(x.join(y, Cross, Some("x.b".attr === 
"y.d".attr)).join(z, Cross),
   Some((Seq((x, Cross), (y, Cross), (z, Cross)), Seq("x.b".attr === 
"y.d".attr
-testExtractCheckCross(x.join(y, Inner, Some("x.b".attr === 
"y.d".attr)).join(z, Cross),
+testExtractInnerJoins(x.join(y, Inner, Some("x.b".attr === 
"y.d".attr)).join(z, Cross),
   Some((Seq((x, Inner), (y, Inner), (z, Cross)), Seq("x.b".attr === 
"y.d".attr
-testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === 
"y.d".attr)).join(z, Inner),
+testExtractInnerJoins(x.join(y, Cross, Some("x.b".attr === 
"y.d".attr)).join(z, Inner),
   Some((Seq((x, Cross), (y, Cross), (z, Inner)), Seq("x.b".attr === 
"y.d".attr
   }
--- End diff --

ok, I'll try to add


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-26 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198043374
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
--- End diff --

Since the main purpose of this function is to order joins, I set the name. 
IMHO the projection is not a main task here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-25 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198029508
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
--- End diff --

Call this "projectIfNecessary"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-25 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198031184
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!sameJoinOrder(orderedJoins, originalPlan)) {
+  if (originalPlan.output != orderedJoins.output) {
--- End diff --

I think this also fixes an existing bug for join reordering, right? Shall 
we add it to the issue description?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-25 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198029399
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 }
 
 /**
- * A pattern that collects the filter and inner joins.
+ * A pattern that collects the filter and inner joins and skip projections 
with attributes only.
  *
  *  Filter
  *|
  *inner Join
  *  /\>  (Seq(plan0, plan1, plan2), 
conditions)
  *  Filter   plan2
  *|
+ *  Project
--- End diff --

I don't think this case would be covered, i.e., Project-over-Filter or 
Filter-over-Project.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-06-25 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r198030263
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -81,14 +92,14 @@ class JoinOptimizationSuite extends PlanTest {
 testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
   Some((Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr
 
-testExtractCheckCross(x.join(y, Cross), Some((Seq((x, Cross), (y, 
Cross)), Seq(
-testExtractCheckCross(x.join(y, Cross).join(z, Cross),
+testExtractInnerJoins(x.join(y, Cross), Some((Seq((x, Cross), (y, 
Cross)), Seq(
+testExtractInnerJoins(x.join(y, Cross).join(z, Cross),
   Some((Seq((x, Cross), (y, Cross), (z, Cross)), Seq(
-testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === 
"y.d".attr)).join(z, Cross),
+testExtractInnerJoins(x.join(y, Cross, Some("x.b".attr === 
"y.d".attr)).join(z, Cross),
   Some((Seq((x, Cross), (y, Cross), (z, Cross)), Seq("x.b".attr === 
"y.d".attr
-testExtractCheckCross(x.join(y, Inner, Some("x.b".attr === 
"y.d".attr)).join(z, Cross),
+testExtractInnerJoins(x.join(y, Inner, Some("x.b".attr === 
"y.d".attr)).join(z, Cross),
   Some((Seq((x, Inner), (y, Inner), (z, Cross)), Seq("x.b".attr === 
"y.d".attr
-testExtractCheckCross(x.join(y, Cross, Some("x.b".attr === 
"y.d".attr)).join(z, Inner),
+testExtractInnerJoins(x.join(y, Cross, Some("x.b".attr === 
"y.d".attr)).join(z, Inner),
   Some((Seq((x, Cross), (y, Cross), (z, Inner)), Seq("x.b".attr === 
"y.d".attr
   }
--- End diff --

It would be worth adding tests with "where" and "select" together if the 
scenario I mentioned above can be implemented.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180616339
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -145,4 +161,55 @@ class JoinOptimizationSuite extends PlanTest {
 }
 assert(broadcastChildren.size == 1)
   }
+
+  test("SPARK-23172 skip projections when flattening joins") {
+val x = testRelation.subquery('x)
+val y = testRelation1.subquery('y)
+val z = testRelation.subquery('z)
+val joined = x.join(z, Inner, Some($"x.b" === $"z.b")).select($"x.a", 
$"z.a", $"z.c")
+  .join(y, Inner, Some($"y.d" === $"z.a")).analyze
+val expectedTables = joined.collectLeaves().map { case p => (p, Inner) 
}
+val expectedConditions = joined.collect { case Join(_, _, _, 
Some(conditions)) => conditions }
+testExtractInnerJoins(joined, Some((expectedTables, 
expectedConditions)))
+  }
+
+  test("SPARK-23172 reorder joins with projections") {
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180616142
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -116,7 +127,12 @@ class JoinOptimizationSuite extends PlanTest {
 )
 
 queryAnswers foreach { queryAnswerPair =>
-  val optimized = Optimize.execute(queryAnswerPair._1.analyze)
+  val optimized = Optimize.execute(queryAnswerPair._1.analyze) match {
+// `ReorderJoin` may add `Project` to keep the same order of 
output attributes.
+// So, we drop a top `Project` for tests.
+case project: Project => project.child
--- End diff --

yea, great suggestion and I think so. I'll try to fix.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180615479
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -172,17 +174,20 @@ object ExtractFiltersAndInnerJoins extends 
PredicateHelper {
 case Filter(filterCondition, j @ Join(left, right, _: InnerLike, 
joinCondition)) =>
   val (plans, conditions) = flattenJoin(j)
   (plans, conditions ++ splitConjunctivePredicates(filterCondition))
-
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _))
+// Keep flattening joins when the project has attributes only
+if p.projectList.forall(_.isInstanceOf[Attribute]) =>
+  flattenJoin(j)
 case _ => (Seq((plan, parentJoinType)), Seq.empty)
   }
 
-  def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], 
Seq[Expression])]
-  = plan match {
-case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, 
_)) =>
-  Some(flattenJoin(f))
-case j @ Join(_, _, joinType, _) =>
-  Some(flattenJoin(j))
-case _ => None
+  def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], 
Seq[Expression])] = {
+val (plans, conditions) = flattenJoin(plan)
+if (plans.size > 1) {
--- End diff --

aha, sounds good to me. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180615326
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -59,12 +75,7 @@ class JoinOptimizationSuite extends PlanTest {
   (noCartesian, seq_pair._2)
 }
   }
-  testExtractCheckCross(plan, expectedNoCross)
-}
-
-def testExtractCheckCross
-(plan: LogicalPlan, expected: Option[(Seq[(LogicalPlan, 
InnerLike)], Seq[Expression])]) {
-  assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
--- End diff --

If we have multiple conditions, we need the compare them as `Set`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180613972
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180345994
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,50 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
+: Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
--- End diff --

how about `sameJoinOrder`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180346211
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,49 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private def extractLeftDeepInnerJoins(plan: LogicalPlan): 
Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!checkSameJoinOrder(orderedJoins, originalPlan)) {
--- End diff --

ah, right, thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180327289
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -172,17 +174,20 @@ object ExtractFiltersAndInnerJoins extends 
PredicateHelper {
 case Filter(filterCondition, j @ Join(left, right, _: InnerLike, 
joinCondition)) =>
   val (plans, conditions) = flattenJoin(j)
   (plans, conditions ++ splitConjunctivePredicates(filterCondition))
-
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _))
+// Keep flattening joins when the project has attributes only
+if p.projectList.forall(_.isInstanceOf[Attribute]) =>
+  flattenJoin(j)
 case _ => (Seq((plan, parentJoinType)), Seq.empty)
   }
 
-  def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], 
Seq[Expression])]
-  = plan match {
-case f @ Filter(filterCondition, j @ Join(_, _, joinType: InnerLike, 
_)) =>
-  Some(flattenJoin(f))
-case j @ Join(_, _, joinType, _) =>
-  Some(flattenJoin(j))
-case _ => None
+  def unapply(plan: LogicalPlan): Option[(Seq[(LogicalPlan, InnerLike)], 
Seq[Expression])] = {
+val (plans, conditions) = flattenJoin(plan)
+if (plans.size > 1) {
--- End diff --

how about `plans.size > 2 && conditions.nonEmpty`? then we can remove the 
`if` condition 
[here](https://github.com/apache/spark/pull/20345/files#diff-17d31b198ff391188311550fcabd1198R120)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180344569
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -59,12 +75,7 @@ class JoinOptimizationSuite extends PlanTest {
   (noCartesian, seq_pair._2)
 }
   }
-  testExtractCheckCross(plan, expectedNoCross)
-}
-
-def testExtractCheckCross
-(plan: LogicalPlan, expected: Option[(Seq[(LogicalPlan, 
InnerLike)], Seq[Expression])]) {
-  assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
--- End diff --

since you rewrite this function, is the previous comparison logic wrong?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180328615
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -145,4 +161,55 @@ class JoinOptimizationSuite extends PlanTest {
 }
 assert(broadcastChildren.size == 1)
   }
+
+  test("SPARK-23172 skip projections when flattening joins") {
+val x = testRelation.subquery('x)
+val y = testRelation1.subquery('y)
+val z = testRelation.subquery('z)
+val joined = x.join(z, Inner, Some($"x.b" === $"z.b")).select($"x.a", 
$"z.a", $"z.c")
+  .join(y, Inner, Some($"y.d" === $"z.a")).analyze
+val expectedTables = joined.collectLeaves().map { case p => (p, Inner) 
}
+val expectedConditions = joined.collect { case Join(_, _, _, 
Some(conditions)) => conditions }
+testExtractInnerJoins(joined, Some((expectedTables, 
expectedConditions)))
+  }
+
+  test("SPARK-23172 reorder joins with projections") {
--- End diff --

The case can also happen without star schema enabled, right? Is it possible 
to use a simpler case like the one in pr description?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180340667
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -46,6 +48,20 @@ class JoinOptimizationSuite extends PlanTest {
   val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
   val testRelation1 = LocalRelation('d.int)
 
+  def testExtractInnerJoins(
--- End diff --

private


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-04-10 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r180344118
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -116,7 +127,12 @@ class JoinOptimizationSuite extends PlanTest {
 )
 
 queryAnswers foreach { queryAnswerPair =>
-  val optimized = Optimize.execute(queryAnswerPair._1.analyze)
+  val optimized = Optimize.execute(queryAnswerPair._1.analyze) match {
+// `ReorderJoin` may add `Project` to keep the same order of 
output attributes.
+// So, we drop a top `Project` for tests.
+case project: Project => project.child
--- End diff --

I'm a little hesitate to change this, because if we really forget to add a 
Project node after join reordering, the test can pass, but that's wrong.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-03-21 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r176027921
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -145,4 +161,55 @@ class JoinOptimizationSuite extends PlanTest {
 }
 assert(broadcastChildren.size == 1)
   }
+
+  test("SPARK-23172 skip projections when flattening joins") {
+val x = testRelation.subquery('x)
+val y = testRelation1.subquery('y)
+val z = testRelation.subquery('z)
+val joined = x.join(z, Inner, Some($"x.b" === $"z.b")).select($"x.a", 
$"z.a", $"z.c")
+  .join(y, Inner, Some($"y.d" === $"z.a")).analyze
+val expectedTables = joined.collectLeaves().map { case p => (p, Inner) 
}
+val expectedConditions = joined.collect { case Join(_, _, _, 
Some(conditions)) => conditions }
+testExtractInnerJoins(joined, Some((expectedTables, 
expectedConditions)))
+  }
+
+  test("SPARK-23172 reorder joins with projections") {
--- End diff --

@wzhfy Added this test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-03-20 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r175971417
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,49 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private def extractLeftDeepInnerJoins(plan: LogicalPlan): 
Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!checkSameJoinOrder(orderedJoins, originalPlan)) {
--- End diff --

If we don't have this check, `operatorOptimizationRuleSet` reaches 
`fixedPoint` because `ReorderJoin` is re-applied in the same join trees every 
time the optimization rule batch invoked. This case does not happen in the 
master because reordered joins have `Project` in internal nodes (`Project` 
added by following optimization rules, e.g., `ColumnPruning`) and this plan 
structure guards this case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-03-20 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r175971428
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -145,4 +159,15 @@ class JoinOptimizationSuite extends PlanTest {
 }
 assert(broadcastChildren.size == 1)
   }
+
+  test("SPARK-23172 skip projections when flattening joins") {
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-03-20 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r175971439
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 }
 
 /**
- * A pattern that collects the filter and inner joins.
+ * A pattern that collects the filter and inner joins (and skip 
projections in plan sub-trees).
--- End diff --

ok


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-03-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r175725372
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -84,19 +84,49 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
 }
   }
 
+  // Extract a list of logical plans to be joined for join-order 
comparisons.
+  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
+  // the same strategy to extract the plan list.
+  private def extractLeftDeepInnerJoins(plan: LogicalPlan): 
Seq[LogicalPlan] = plan match {
+case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
+case p @ Project(_, j @ Join(_, _, _: InnerLike, _)) => 
extractLeftDeepInnerJoins(j)
+case _ => Seq(plan)
+  }
+
+  private def checkSameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
+extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
+  }
+
+  private def mayCreateOrderedJoin(
+  originalPlan: LogicalPlan,
+  input: Seq[(LogicalPlan, InnerLike)],
+  conditions: Seq[Expression]): LogicalPlan = {
+val orderedJoins = createOrderedJoin(input, conditions)
+if (!checkSameJoinOrder(orderedJoins, originalPlan)) {
--- End diff --

Is this check necessary? I think check `originalPlan.output != 
orderedJoins.output` is enough, and faster.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-03-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r175696570
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -141,14 +141,16 @@ object ExtractEquiJoinKeys extends Logging with 
PredicateHelper {
 }
 
 /**
- * A pattern that collects the filter and inner joins.
+ * A pattern that collects the filter and inner joins (and skip 
projections in plan sub-trees).
--- End diff --

skip projections with attributes only


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-03-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r175727668
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 ---
@@ -145,4 +159,15 @@ class JoinOptimizationSuite extends PlanTest {
 }
 assert(broadcastChildren.size == 1)
   }
+
+  test("SPARK-23172 skip projections when flattening joins") {
--- End diff --

Could you add a test case which would fail before the fix?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-03-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r175696187
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -172,17 +174,23 @@ object ExtractFiltersAndInnerJoins extends 
PredicateHelper {
 case Filter(filterCondition, j @ Join(left, right, _: InnerLike, 
joinCondition)) =>
   val (plans, conditions) = flattenJoin(j)
   (plans, conditions ++ splitConjunctivePredicates(filterCondition))
-
+case p @ Project(_, j @ Join(left, right, _: InnerLike, 
joinCondition)) =>
+  // Keep flattening joins when projects having attributes only
+  if (p.outputSet.subsetOf(j.outputSet)) {
--- End diff --

If we want to make sure the project has attributes only, should it be 
`p.projectList.forall(_.isInstanceOf[Attribute])`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20345: [SPARK-23172][SQL] Expand the ReorderJoin rule to...

2018-03-20 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/20345#discussion_r175696302
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 ---
@@ -172,17 +174,23 @@ object ExtractFiltersAndInnerJoins extends 
PredicateHelper {
 case Filter(filterCondition, j @ Join(left, right, _: InnerLike, 
joinCondition)) =>
   val (plans, conditions) = flattenJoin(j)
   (plans, conditions ++ splitConjunctivePredicates(filterCondition))
-
+case p @ Project(_, j @ Join(left, right, _: InnerLike, 
joinCondition)) =>
+  // Keep flattening joins when projects having attributes only
--- End diff --

nit: when projects having attributes only => when the project has 
attributes only


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org