[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19281 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r140381057 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala --- @@ -96,6 +96,24 @@ object SortOrder { sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = { new SortOrder(child, direction, direction.defaultNullOrdering, sameOrderExpressions) } + + /** + * Returns if a sequence of SortOrder satisfies another sequence of SortOrder. + * + * SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A + * or of A's prefix. + */ + def orderingSatisfies(seq1: Seq[SortOrder], seq2: Seq[SortOrder]): Boolean = { --- End diff -- I had originally named them "actualOrdering" and "requiredOrdering" in SparkPlan, but since I've moved the function here to SortOrder, thought the names should be more general. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r140215077 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -787,4 +789,64 @@ class JoinSuite extends QueryTest with SharedSQLContext { } } } + + test("test SortMergeJoin output ordering") { +val joinQueries = Seq( + "SELECT * FROM testData JOIN testData2 ON key = a", + "SELECT * FROM testData t1 JOIN " + +"testData2 t2 ON t1.key = t2.a JOIN testData3 t3 ON t2.a = t3.a", + "SELECT * FROM testData t1 JOIN " + +"testData2 t2 ON t1.key = t2.a JOIN " + +"testData3 t3 ON t2.a = t3.a JOIN " + +"testData t4 ON t1.key = t4.key") + +def assertJoinOrdering(sqlString: String): Unit = { + val df = sql(sqlString) + val physical = df.queryExecution.sparkPlan + val physicalJoins = physical.collect { +case j: SortMergeJoinExec => j + } + val executed = df.queryExecution.executedPlan + val executedJoins = executed.collect { +case j: SortMergeJoinExec => j + } + // This only applies to the above tested queries, in which a child SortMergeJoin always + // contains the SortOrder required by its parent SortMergeJoin. Thus, SortExec should never + // appear as parent of SortMergeJoin. + executed.foreach { +case s: SortExec => s.foreach { + case j: SortMergeJoinExec => fail( +s"No extra sort should be added since $j already satisfies the required ordering" + ) + case _ => +} +case _ => + } + val joinPairs = physicalJoins.zip(executedJoins) + val numOfJoins = sqlString.split(" ").count(_.toUpperCase == "JOIN") + assert(joinPairs.size == numOfJoins) + + joinPairs.foreach { +case(join1, join2) => + val leftKeys = join1.leftKeys + val rightKeys = join1.rightKeys + val outputOrderingPhysical = join1.outputOrdering + val outputOrderingExecuted = join2.outputOrdering + + // outputOrdering should always contain join keys + assert( +SortOrder.orderingSatisfies( + outputOrderingPhysical, leftKeys.map(SortOrder(_, Ascending + assert( +SortOrder.orderingSatisfies( + outputOrderingPhysical, rightKeys.map(SortOrder(_, Ascending + // outputOrdering should be consistent between physical plan and executed plan + assert(outputOrderingPhysical == outputOrderingExecuted, +s"Physical operator $join1 did not have the same output ordering as " + +s"corresponding executed operator $join2") --- End diff -- nit: physical/executed operator => physical/executed plan? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r140191382 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -102,13 +102,22 @@ case class SortMergeJoinExec( } /** - * For SMJ, child's output must have been sorted on key or expressions with the same order as - * key, so we can get ordering for key from child's output ordering. + * The utility method to get output ordering for left or right side of the join. + * + * Returns the required ordering for left or right child if childOutputOrdering does not + * satisfy the required ordering; otherwise, in which case the child will not be re-sorted, --- End diff -- nit: in case the child does not need to be sorted --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r140189310 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala --- @@ -96,6 +96,24 @@ object SortOrder { sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = { new SortOrder(child, direction, direction.defaultNullOrdering, sameOrderExpressions) } + + /** + * Returns if a sequence of SortOrder satisfies another sequence of SortOrder. + * + * SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A + * or of A's prefix. + */ + def orderingSatisfies(seq1: Seq[SortOrder], seq2: Seq[SortOrder]): Boolean = { --- End diff -- nit: `seq1` => `ordering`, `seq2` => `requiredOrdering` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user wzhfy commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r140188874 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala --- @@ -96,6 +96,24 @@ object SortOrder { sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = { new SortOrder(child, direction, direction.defaultNullOrdering, sameOrderExpressions) } + + /** + * Returns if a sequence of SortOrder satisfies another sequence of SortOrder. + * + * SortOrder sequence A satisfies SortOrder sequence B if and only if B is an equivalent of A + * or of A's prefix. --- End diff -- It would be better to add an example in comment: `E.g. If ordering A is [x, y] and Ordering B is [x], then A satisfies B.` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139875187 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -101,14 +101,15 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - /** - * For SMJ, child's output must have been sorted on key or expressions with the same order as - * key, so we can get ordering for key from child's output ordering. - */ private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder]) : Seq[SortOrder] = { -keys.zip(childOutputOrdering).map { case (key, childOrder) => - SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key) +val requiredOrdering = requiredOrders(keys) +if (SparkPlan.orderingSatisfies(childOutputOrdering, requiredOrdering)) { --- End diff -- Please add a comment here to explain the reason. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139875333 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext { } } + def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: Int): Any = { --- End diff -- Please add function comments to explain what it does --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139873950 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { --- End diff -- Let us first move it to `SortOrder`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139873547 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { +if (required.nonEmpty) { + if (required.length > actual.length) { +false + } else { +required.zip(actual).forall { + case (requiredOrder, actualOrder) => +actualOrder.satisfies(requiredOrder) +} + } +} else { + true +} --- End diff -- Please simplify it to ```Scala if (required.isEmpty) { true } else if (required.length > actual.length) { false } else { required.zip(actual).forall { case (requiredOrder, actualOrder) => actualOrder.satisfies(requiredOrder) } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139850950 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala --- @@ -101,14 +101,15 @@ case class SortMergeJoinExec( s"${getClass.getSimpleName} should not take $x as the JoinType") } - /** - * For SMJ, child's output must have been sorted on key or expressions with the same order as - * key, so we can get ordering for key from child's output ordering. - */ private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: Seq[SortOrder]) : Seq[SortOrder] = { -keys.zip(childOutputOrdering).map { case (key, childOrder) => - SortOrder(key, Ascending, childOrder.sameOrderExpressions + childOrder.child - key) +val requiredOrdering = requiredOrders(keys) +if (SparkPlan.orderingSatisfies(childOutputOrdering, requiredOrdering)) { --- End diff -- This looks good to me. cc @wzhfy who last touched this code --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139850236 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext { } } + def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: Int): Any = { --- End diff -- BTW: since this is only used by one test case, we could put it inside the test case method and not make it class level --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139850127 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala --- @@ -64,6 +67,42 @@ class JoinSuite extends QueryTest with SharedSQLContext { } } + def assertJoinOrdering(sqlString: String, numOfJoin: Int, numOfSort: Int): Any = { --- End diff -- nit: comparing the counts does not ensure that the sorts are in right place. I wish there was an easier way to pass that here but I can't think of any --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139849801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { --- End diff -- SparkPlan is the node for physical operator in SQL so doesn't feel like a good place to have this. Since one would want to have all methods related to `SortOrder` in a single place, the object class feels better option. We can revisit if there are more such methods being added to that object and refac to a class. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user maryannxue commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139844429 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { --- End diff -- Actually I had struggled where to put this, in SortOrder or SparkPlan. It doesn't look like anywhere else is using Seq[SortOrder] so far, so I chose to leave this out of SortOrder. I think, though, if we see potential usage of Seq[SortOrder] elsewhere, it might be worth to wrap it as a class. Agree? Either way, I could put it into SortOrder for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
Github user tejasapatil commented on a diff in the pull request: https://github.com/apache/spark/pull/19281#discussion_r139820801 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala --- @@ -396,6 +396,26 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ object SparkPlan { private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) + + /** + * Returns if the actual ordering satisfies the required ordering. + * + * Ordering A satisfies ordering B if and only if B is an equivalent of A or of A's prefix. + */ + def orderingSatisfies(actual: Seq[SortOrder], required: Seq[SortOrder]): Boolean = { --- End diff -- how about moving this to `SortOrder` object : https://github.com/apache/spark/blob/e9c91badce64731ffd3e53cbcd9f044a7593e6b8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala#L92 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19281: [SPARK-21998][SQL] SortMergeJoinExec did not calc...
GitHub user maryannxue opened a pull request: https://github.com/apache/spark/pull/19281 [SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning ## What changes were proposed in this pull request? Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children. For example, J = {A join B on key1 = key2} 1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC" 2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC" 3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1" So to fix this I changed the behavior of getKeyOrdering(keys, childOutputOrdering) to: 1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering 2. Otherwise => required child ordering In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes. ## How was this patch tested? Added new test cases. Passed all integration tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/maryannxue/spark spark-21998 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19281.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19281 commit 7893935d694663316575a7485ea833fab998d108 Author: maryannxueDate: 2017-09-19T18:23:15Z [SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org