Repository: spark
Updated Branches:
  refs/heads/master 5ac96854c -> 5960686e7


[SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering 
correctly during physical planning

## What changes were proposed in this pull request?

Right now the calculation of SortMergeJoinExec's outputOrdering relies on the 
fact that its children have already been sorted on the join keys, while this is 
often not true until EnsureRequirements has been applied. So we ended up not 
getting the correct outputOrdering during physical planning stage before Sort 
nodes are added to the children.

For example, J = {A join B on key1 = key2}
1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should 
include "key1 ASC, sameOrderExp=c1"

So to fix this I changed the  behavior of <code>getKeyOrdering(keys, 
childOutputOrdering)</code> to:
1. If the childOutputOrdering satisfies (is a superset of) the required child 
ordering => childOutputOrdering
2. Otherwise => required child ordering

In addition, I organized the logic for deciding the relationship between two 
orderings into SparkPlan, so that it can be reused by EnsureRequirements and 
SortMergeJoinExec, and potentially other classes.

## How was this patch tested?

Added new test cases.
Passed all integration tests.

Author: maryannxue <maryann....@gmail.com>

Closes #19281 from maryannxue/spark-21998.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5960686e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5960686e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5960686e

Branch: refs/heads/master
Commit: 5960686e791b5d6642a30c43c1de61e96e594a5e
Parents: 5ac9685
Author: maryannxue <maryann....@gmail.com>
Authored: Thu Sep 21 23:54:16 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Sep 21 23:54:16 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/SortOrder.scala    | 23 ++++++++
 .../execution/exchange/EnsureRequirements.scala | 21 ++-----
 .../sql/execution/joins/SortMergeJoinExec.scala | 17 ++++--
 .../scala/org/apache/spark/sql/JoinSuite.scala  | 62 ++++++++++++++++++++
 4 files changed, 102 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
index abcb9a2..ff7c98f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala
@@ -96,6 +96,29 @@ object SortOrder {
      sameOrderExpressions: Set[Expression] = Set.empty): SortOrder = {
     new SortOrder(child, direction, direction.defaultNullOrdering, 
sameOrderExpressions)
   }
+
+  /**
+   * Returns if a sequence of SortOrder satisfies another sequence of 
SortOrder.
+   *
+   * SortOrder sequence A satisfies SortOrder sequence B if and only if B is 
an equivalent of A
+   * or of A's prefix. Here are examples of ordering A satisfying ordering B:
+   * <ul>
+   *   <li>ordering A is [x, y] and ordering B is [x]</li>
+   *   <li>ordering A is [x(sameOrderExpressions=x1)] and ordering B is 
[x1]</li>
+   *   <li>ordering A is [x(sameOrderExpressions=x1), y] and ordering B is 
[x1]</li>
+   * </ul>
+   */
+  def orderingSatisfies(ordering1: Seq[SortOrder], ordering2: Seq[SortOrder]): 
Boolean = {
+    if (ordering2.isEmpty) {
+      true
+    } else if (ordering2.length > ordering1.length) {
+      false
+    } else {
+      ordering2.zip(ordering1).forall {
+        case (o2, o1) => o1.satisfies(o2)
+      }
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index b91d077..1da72f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -234,24 +234,11 @@ case class EnsureRequirements(conf: SQLConf) extends 
Rule[SparkPlan] {
 
     // Now that we've performed any necessary shuffles, add sorts to guarantee 
output orderings:
     children = children.zip(requiredChildOrderings).map { case (child, 
requiredOrdering) =>
-      if (requiredOrdering.nonEmpty) {
-        // If child.outputOrdering is [a, b] and requiredOrdering is [a], we 
do not need to sort.
-        val orderingMatched = if (requiredOrdering.length > 
child.outputOrdering.length) {
-          false
-        } else {
-          requiredOrdering.zip(child.outputOrdering).forall {
-            case (requiredOrder, childOutputOrder) =>
-              childOutputOrder.satisfies(requiredOrder)
-          }
-        }
-
-        if (!orderingMatched) {
-          SortExec(requiredOrdering, global = false, child = child)
-        } else {
-          child
-        }
-      } else {
+      // If child.outputOrdering already satisfies the requiredOrdering, we do 
not need to sort.
+      if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) 
{
         child
+      } else {
+        SortExec(requiredOrdering, global = false, child = child)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
index 91d214e..14de2dc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
@@ -102,13 +102,22 @@ case class SortMergeJoinExec(
   }
 
   /**
-   * For SMJ, child's output must have been sorted on key or expressions with 
the same order as
-   * key, so we can get ordering for key from child's output ordering.
+   * The utility method to get output ordering for left or right side of the 
join.
+   *
+   * Returns the required ordering for left or right child if 
childOutputOrdering does not
+   * satisfy the required ordering; otherwise, which means the child does not 
need to be sorted
+   * again, returns the required ordering for this child with extra 
"sameOrderExpressions" from
+   * the child's outputOrdering.
    */
   private def getKeyOrdering(keys: Seq[Expression], childOutputOrdering: 
Seq[SortOrder])
     : Seq[SortOrder] = {
-    keys.zip(childOutputOrdering).map { case (key, childOrder) =>
-      SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
+    val requiredOrdering = requiredOrders(keys)
+    if (SortOrder.orderingSatisfies(childOutputOrdering, requiredOrdering)) {
+      keys.zip(childOutputOrdering).map { case (key, childOrder) =>
+        SortOrder(key, Ascending, childOrder.sameOrderExpressions + 
childOrder.child - key)
+      }
+    } else {
+      requiredOrdering
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5960686e/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 453052a..9d50e8b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -24,6 +24,8 @@ import scala.language.existentials
 import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder}
+import org.apache.spark.sql.execution.SortExec
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -787,4 +789,64 @@ class JoinSuite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("test SortMergeJoin output ordering") {
+    val joinQueries = Seq(
+      "SELECT * FROM testData JOIN testData2 ON key = a",
+      "SELECT * FROM testData t1 JOIN " +
+        "testData2 t2 ON t1.key = t2.a JOIN testData3 t3 ON t2.a = t3.a",
+      "SELECT * FROM testData t1 JOIN " +
+        "testData2 t2 ON t1.key = t2.a JOIN " +
+        "testData3 t3 ON t2.a = t3.a JOIN " +
+        "testData t4 ON t1.key = t4.key")
+
+    def assertJoinOrdering(sqlString: String): Unit = {
+      val df = sql(sqlString)
+      val physical = df.queryExecution.sparkPlan
+      val physicalJoins = physical.collect {
+        case j: SortMergeJoinExec => j
+      }
+      val executed = df.queryExecution.executedPlan
+      val executedJoins = executed.collect {
+        case j: SortMergeJoinExec => j
+      }
+      // This only applies to the above tested queries, in which a child 
SortMergeJoin always
+      // contains the SortOrder required by its parent SortMergeJoin. Thus, 
SortExec should never
+      // appear as parent of SortMergeJoin.
+      executed.foreach {
+        case s: SortExec => s.foreach {
+          case j: SortMergeJoinExec => fail(
+            s"No extra sort should be added since $j already satisfies the 
required ordering"
+          )
+          case _ =>
+        }
+        case _ =>
+      }
+      val joinPairs = physicalJoins.zip(executedJoins)
+      val numOfJoins = sqlString.split(" ").count(_.toUpperCase == "JOIN")
+      assert(joinPairs.size == numOfJoins)
+
+      joinPairs.foreach {
+        case(join1, join2) =>
+          val leftKeys = join1.leftKeys
+          val rightKeys = join1.rightKeys
+          val outputOrderingPhysical = join1.outputOrdering
+          val outputOrderingExecuted = join2.outputOrdering
+
+          // outputOrdering should always contain join keys
+          assert(
+            SortOrder.orderingSatisfies(
+              outputOrderingPhysical, leftKeys.map(SortOrder(_, Ascending))))
+          assert(
+            SortOrder.orderingSatisfies(
+              outputOrderingPhysical, rightKeys.map(SortOrder(_, Ascending))))
+          // outputOrdering should be consistent between physical plan and 
executed plan
+          assert(outputOrderingPhysical == outputOrderingExecuted,
+            s"Operator $join1 did not have the same output ordering in the 
physical plan as in " +
+            s"the executed plan.")
+      }
+    }
+
+    joinQueries.foreach(assertJoinOrdering)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to