Github user wzhfy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19281#discussion_r140215077
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala ---
    @@ -787,4 +789,64 @@ class JoinSuite extends QueryTest with 
SharedSQLContext {
           }
         }
       }
    +
    +  test("test SortMergeJoin output ordering") {
    +    val joinQueries = Seq(
    +      "SELECT * FROM testData JOIN testData2 ON key = a",
    +      "SELECT * FROM testData t1 JOIN " +
    +        "testData2 t2 ON t1.key = t2.a JOIN testData3 t3 ON t2.a = t3.a",
    +      "SELECT * FROM testData t1 JOIN " +
    +        "testData2 t2 ON t1.key = t2.a JOIN " +
    +        "testData3 t3 ON t2.a = t3.a JOIN " +
    +        "testData t4 ON t1.key = t4.key")
    +
    +    def assertJoinOrdering(sqlString: String): Unit = {
    +      val df = sql(sqlString)
    +      val physical = df.queryExecution.sparkPlan
    +      val physicalJoins = physical.collect {
    +        case j: SortMergeJoinExec => j
    +      }
    +      val executed = df.queryExecution.executedPlan
    +      val executedJoins = executed.collect {
    +        case j: SortMergeJoinExec => j
    +      }
    +      // This only applies to the above tested queries, in which a child 
SortMergeJoin always
    +      // contains the SortOrder required by its parent SortMergeJoin. 
Thus, SortExec should never
    +      // appear as parent of SortMergeJoin.
    +      executed.foreach {
    +        case s: SortExec => s.foreach {
    +          case j: SortMergeJoinExec => fail(
    +            s"No extra sort should be added since $j already satisfies the 
required ordering"
    +          )
    +          case _ =>
    +        }
    +        case _ =>
    +      }
    +      val joinPairs = physicalJoins.zip(executedJoins)
    +      val numOfJoins = sqlString.split(" ").count(_.toUpperCase == "JOIN")
    +      assert(joinPairs.size == numOfJoins)
    +
    +      joinPairs.foreach {
    +        case(join1, join2) =>
    +          val leftKeys = join1.leftKeys
    +          val rightKeys = join1.rightKeys
    +          val outputOrderingPhysical = join1.outputOrdering
    +          val outputOrderingExecuted = join2.outputOrdering
    +
    +          // outputOrdering should always contain join keys
    +          assert(
    +            SortOrder.orderingSatisfies(
    +              outputOrderingPhysical, leftKeys.map(SortOrder(_, 
Ascending))))
    +          assert(
    +            SortOrder.orderingSatisfies(
    +              outputOrderingPhysical, rightKeys.map(SortOrder(_, 
Ascending))))
    +          // outputOrdering should be consistent between physical plan and 
executed plan
    +          assert(outputOrderingPhysical == outputOrderingExecuted,
    +            s"Physical operator $join1 did not have the same output 
ordering as " +
    +            s"corresponding executed operator $join2")
    --- End diff --
    
    nit: physical/executed operator => physical/executed plan?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to