Github user maryannxue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20345#discussion_r198208026
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
    @@ -84,19 +84,51 @@ object ReorderJoin extends Rule[LogicalPlan] with 
PredicateHelper {
         }
       }
     
    +  // Extract a list of logical plans to be joined for join-order 
comparisons.
    +  // Since `ExtractFiltersAndInnerJoins` handles left-deep trees only, 
this function have
    +  // the same strategy to extract the plan list.
    +  private[optimizer] def extractLeftDeepInnerJoins(plan: LogicalPlan)
    +    : Seq[LogicalPlan] = plan match {
    +    case j @ Join(left, right, _: InnerLike, _) => right +: 
extractLeftDeepInnerJoins(left)
    +    case Filter(_, child) => extractLeftDeepInnerJoins(child)
    +    case Project(_, child) => extractLeftDeepInnerJoins(child)
    +    case _ => Seq(plan)
    +  }
    +
    +  private def sameJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan): 
Boolean = {
    +    extractLeftDeepInnerJoins(plan1) == extractLeftDeepInnerJoins(plan2)
    +  }
    +
    +  private def mayCreateOrderedJoin(
    +      originalPlan: LogicalPlan,
    +      input: Seq[(LogicalPlan, InnerLike)],
    +      conditions: Seq[Expression]): LogicalPlan = {
    +    val orderedJoins = createOrderedJoin(input, conditions)
    +    if (!sameJoinOrder(orderedJoins, originalPlan)) {
    +      if (originalPlan.output != orderedJoins.output) {
    +        // Keep the same output attributes and the order
    +        Project(originalPlan.output, orderedJoins)
    +      } else {
    +        orderedJoins
    +      }
    +    } else {
    +      originalPlan
    +    }
    +  }
    +
       def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    -    case ExtractFiltersAndInnerJoins(input, conditions)
    +    case p @ ExtractFiltersAndInnerJoins(input, conditions)
             if input.size > 2 && conditions.nonEmpty =>
           if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) {
             val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, 
conditions)
             if (starJoinPlan.nonEmpty) {
               val rest = input.filterNot(starJoinPlan.contains(_))
    -          createOrderedJoin(starJoinPlan ++ rest, conditions)
    +          mayCreateOrderedJoin(p, starJoinPlan ++ rest, conditions)
             } else {
    -          createOrderedJoin(input, conditions)
    +          mayCreateOrderedJoin(p, input, conditions)
             }
           } else {
    -        createOrderedJoin(input, conditions)
    +        mayCreateOrderedJoin(p, input, conditions)
           }
    --- End diff --
    
    How about make it like:
    ```
           val joinReorderedPlan = if (SQLConf.get.starSchemaDetection && 
!SQLConf.get.cboEnabled) {
             val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, 
conditions)
             if (starJoinPlan.nonEmpty) {
               val rest = input.filterNot(starJoinPlan.contains(_))
               createOrderedJoin(starJoinPlan ++ rest, conditions)
             } else {
               createOrderedJoin(input, conditions)
             }
           } else {
             createOrderedJoin(input, conditions)
           }
           projectIfNecessary(joinReorderedPlan, p)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to