[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-22 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19281


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-21 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r140381057
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 ---
@@ -96,6 +96,24 @@ object SortOrder {
  sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = {
 new SortOrder(child, direction, direction.defaultNullOrdering, 
sameOrderExpressions)
   }
+
+  /**
+   * Returns if a sequence of SortOrder satisfies another sequence of 
SortOrder.
+   *
+   * SortOrder sequence A satisfies SortOrder sequence B if and only if B 
is an equivalent of A
+   * or of A's prefix.
+   */
+  def orderingSatisfies(seq1: Seq[SortOrder], seq2: Seq[SortOrder]): 
Boolean = {
--- End diff --

I had originally named them "actualOrdering" and "requiredOrdering" in 
SparkPlan, but since I've moved the function here to SortOrder, thought the 
names should be more general.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r140215077
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -787,4 +789,64 @@ class JoinSuite extends QueryTest with 
SharedSQLContext {
   }
 }
   }
+
+  test("test SortMergeJoin output ordering") {
+val joinQueries = Seq(
+  "SELECT * FROM testData JOIN testData2 ON key = a",
+  "SELECT * FROM testData t1 JOIN " +
+"testData2 t2 ON t1.key = t2.a JOIN testData3 t3 ON t2.a = t3.a",
+  "SELECT * FROM testData t1 JOIN " +
+"testData2 t2 ON t1.key = t2.a JOIN " +
+"testData3 t3 ON t2.a = t3.a JOIN " +
+"testData t4 ON t1.key = t4.key")
+
+def assertJoinOrdering(sqlString: String): Unit = {
+  val df = sql(sqlString)
+  val physical = df.queryExecution.sparkPlan
+  val physicalJoins = physical.collect {
+case j: SortMergeJoinExec => j
+  }
+  val executed = df.queryExecution.executedPlan
+  val executedJoins = executed.collect {
+case j: SortMergeJoinExec => j
+  }
+  // This only applies to the above tested queries, in which a child 
SortMergeJoin always
+  // contains the SortOrder required by its parent SortMergeJoin. 
Thus, SortExec should never
+  // appear as parent of SortMergeJoin.
+  executed.foreach {
+case s: SortExec => s.foreach {
+  case j: SortMergeJoinExec => fail(
+s"No extra sort should be added since $j already satisfies the 
required ordering"
+  )
+  case _ =>
+}
+case _ =>
+  }
+  val joinPairs = physicalJoins.zip(executedJoins)
+  val numOfJoins = sqlString.split(" ").count(_.toUpperCase == "JOIN")
+  assert(joinPairs.size == numOfJoins)
+
+  joinPairs.foreach {
+case(join1, join2) =>
+  val leftKeys = join1.leftKeys
+  val rightKeys = join1.rightKeys
+  val outputOrderingPhysical = join1.outputOrdering
+  val outputOrderingExecuted = join2.outputOrdering
+
+  // outputOrdering should always contain join keys
+  assert(
+SortOrder.orderingSatisfies(
+  outputOrderingPhysical, leftKeys.map(SortOrder(_, 
Ascending
+  assert(
+SortOrder.orderingSatisfies(
+  outputOrderingPhysical, rightKeys.map(SortOrder(_, 
Ascending
+  // outputOrdering should be consistent between physical plan and 
executed plan
+  assert(outputOrderingPhysical == outputOrderingExecuted,
+s"Physical operator $join1 did not have the same output 
ordering as " +
+s"corresponding executed operator $join2")
--- End diff --

nit: physical/executed operator => physical/executed plan?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r140191382
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -102,13 +102,22 @@ case class SortMergeJoinExec(
   }
 
   /**
-   * For SMJ, child's output must have been sorted on key or expressions 
with the same order as
-   * key, so we can get ordering for key from child's output ordering.
+   * The utility method to get output ordering for left or right side of 
the join.
+   *
+   * Returns the required ordering for left or right child if 
childOutputOrdering does not
+   * satisfy the required ordering; otherwise, in which case the child 
will not be re-sorted,
--- End diff --

nit: in case the child does not need to be sorted


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r140189310
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 ---
@@ -96,6 +96,24 @@ object SortOrder {
  sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = {
 new SortOrder(child, direction, direction.defaultNullOrdering, 
sameOrderExpressions)
   }
+
+  /**
+   * Returns if a sequence of SortOrder satisfies another sequence of 
SortOrder.
+   *
+   * SortOrder sequence A satisfies SortOrder sequence B if and only if B 
is an equivalent of A
+   * or of A's prefix.
+   */
+  def orderingSatisfies(seq1: Seq[SortOrder], seq2: Seq[SortOrder]): 
Boolean = {
--- End diff --

nit: `seq1` => `ordering`, `seq2` => `requiredOrdering`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-21 Thread wzhfy
Github user wzhfy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r140188874
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 ---
@@ -96,6 +96,24 @@ object SortOrder {
  sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = {
 new SortOrder(child, direction, direction.defaultNullOrdering, 
sameOrderExpressions)
   }
+
+  /**
+   * Returns if a sequence of SortOrder satisfies another sequence of 
SortOrder.
+   *
+   * SortOrder sequence A satisfies SortOrder sequence B if and only if B 
is an equivalent of A
+   * or of A's prefix.
--- End diff --

It would be better to add an example in comment: `E.g. If ordering A is [x, 
y] and Ordering B is [x], then A satisfies B.`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139875187
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -101,14 +101,15 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  /**
-   * For SMJ, child's output must have been sorted on key or expressions 
with the same order as
-   * key, so we can get ordering for key from child's output ordering.
-   */
   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
Seq[SortOrder])
 : Seq[SortOrder] = {
-keys.zip(childOutputOrdering).map { case (key, childOrder) =>
-  SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
+val requiredOrdering = requiredOrders(keys)
+if (SparkPlan.orderingSatisfies(childOutputOrdering, 
requiredOrdering)) {
--- End diff --

Please add a comment here to explain the reason.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139875333
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 }
   }
 
+  def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: 
Int): Any = {
--- End diff --

Please add function comments to explain what it does 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139873950
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 object SparkPlan {
   private[execution] val subqueryExecutionContext = 
ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+
+  /**
+   * Returns if the actual ordering satisfies the required ordering.
+   *
+   * Ordering A satisfies ordering B if and only if B is an equivalent of 
A or of A's prefix.
+   */
+  def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): 
Boolean = {
--- End diff --

Let us first move it to `SortOrder`. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139873547
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 object SparkPlan {
   private[execution] val subqueryExecutionContext = 
ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+
+  /**
+   * Returns if the actual ordering satisfies the required ordering.
+   *
+   * Ordering A satisfies ordering B if and only if B is an equivalent of 
A or of A's prefix.
+   */
+  def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): 
Boolean = {
+if (required.nonEmpty) {
+  if (required.length > actual.length) {
+false
+  } else {
+required.zip(actual).forall {
+  case (requiredOrder, actualOrder) =>
+actualOrder.satisfies(requiredOrder)
+}
+  }
+} else {
+  true
+}
--- End diff --

Please simplify it to 
```Scala
if (required.isEmpty) {
  true
} else if (required.length > actual.length) {
  false
} else {
  required.zip(actual).forall { case (requiredOrder, actualOrder) =>
actualOrder.satisfies(requiredOrder)
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139850950
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -101,14 +101,15 @@ case class SortMergeJoinExec(
 s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
-  /**
-   * For SMJ, child's output must have been sorted on key or expressions 
with the same order as
-   * key, so we can get ordering for key from child's output ordering.
-   */
   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
Seq[SortOrder])
 : Seq[SortOrder] = {
-keys.zip(childOutputOrdering).map { case (key, childOrder) =>
-  SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
+val requiredOrdering = requiredOrders(keys)
+if (SparkPlan.orderingSatisfies(childOutputOrdering, 
requiredOrdering)) {
--- End diff --

This looks good to me. cc @wzhfy who last touched this code


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139850236
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 }
   }
 
+  def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: 
Int): Any = {
--- End diff --

BTW: since this is only used by one test case, we could put it inside the 
test case method and not make it class level


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139850127
  
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
@@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext {
 }
   }
 
+  def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: 
Int): Any = {
--- End diff --

nit: comparing the counts does not ensure that the sorts are in right 
place. I wish there was an easier way to pass that here but I can't think of any


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139849801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 object SparkPlan {
   private[execution] val subqueryExecutionContext = 
ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+
+  /**
+   * Returns if the actual ordering satisfies the required ordering.
+   *
+   * Ordering A satisfies ordering B if and only if B is an equivalent of 
A or of A's prefix.
+   */
+  def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): 
Boolean = {
--- End diff --

SparkPlan is the node for physical operator in SQL so doesn't feel like a 
good place to have this. Since one would want to have all methods related to 
`SortOrder` in a single place, the object class feels better option. We can 
revisit if there are more such methods being added to that object and refac to 
a class.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread maryannxue
Github user maryannxue commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139844429
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 object SparkPlan {
   private[execution] val subqueryExecutionContext = 
ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+
+  /**
+   * Returns if the actual ordering satisfies the required ordering.
+   *
+   * Ordering A satisfies ordering B if and only if B is an equivalent of 
A or of A's prefix.
+   */
+  def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): 
Boolean = {
--- End diff --

Actually I had struggled where to put this, in SortOrder or SparkPlan. It 
doesn't look like anywhere else is using Seq[SortOrder] so far, so I chose to 
leave this out of SortOrder. I think, though, if we see potential usage of 
Seq[SortOrder] elsewhere, it might be worth to wrap it as a class. Agree? 
Either way, I could put it into SortOrder for now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/19281#discussion_r139820801
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala ---
@@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] 
with Logging with Serializ
 object SparkPlan {
   private[execution] val subqueryExecutionContext = 
ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonCachedThreadPool("subquery", 16))
+
+  /**
+   * Returns if the actual ordering satisfies the required ordering.
+   *
+   * Ordering A satisfies ordering B if and only if B is an equivalent of 
A or of A's prefix.
+   */
+  def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): 
Boolean = {
--- End diff --

how about moving this to `SortOrder` object : 
https://github.com/apache/spark/blob/e9c91badce64731ffd3e53cbcd9f044a7593e6b8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala#L92


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...

2017-09-19 Thread maryannxue
GitHub user maryannxue opened a pull request:

https://github.com/apache/spark/pull/19281

[SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering 
correctly during physical planning

## What changes were proposed in this pull request?

Right now the calculation of SortMergeJoinExec's outputOrdering relies on 
the fact that its children have already been sorted on the join keys, while 
this is often not true until EnsureRequirements has been applied. So we ended 
up not getting the correct outputOrdering during physical planning stage before 
Sort nodes are added to the children.

For example, J = {A join B on key1 = key2}
1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 
ASC"
2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering 
should include "key1 ASC, sameOrderExp=c1"

So to fix this I changed the  behavior of getKeyOrdering(keys, 
childOutputOrdering) to:
1. If the childOutputOrdering satisfies (is a superset of) the required 
child ordering => childOutputOrdering
2. Otherwise => required child ordering

In addition, I organized the logic for deciding the relationship between 
two orderings into SparkPlan, so that it can be reused by EnsureRequirements 
and SortMergeJoinExec, and potentially other classes.

## How was this patch tested?

Added new test cases.
Passed all integration tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/maryannxue/spark spark-21998

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19281.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19281


commit 7893935d694663316575a7485ea833fab998d108
Author: maryannxue 
Date:   2017-09-19T18:23:15Z

[SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering 
correctly during physical planning




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org