Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22326#discussion_r214991991
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
                 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
               val newRight = rightJoinConditions.
                 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
    -          val newJoinCond = commonJoinCondition.reduceLeftOption(And)
    -
    -          Join(newLeft, newRight, joinType, newJoinCond)
    +          val (newJoinConditions, others) =
    +            commonJoinCondition.partition(canEvaluateWithinJoin)
    +          val newJoinCond = newJoinConditions.reduceLeftOption(And)
    +          val newJoinType =
    +            if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
    +              if (SQLConf.get.crossJoinEnabled) {
    +                // if condition expression is unevaluable, it will be 
removed from
    +                // the new join conditions, if all conditions is 
unevaluable, we should
    +                // change the join type to CrossJoin.
    +                logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
    +                  "plan is unevaluable, it will be ignored and the join 
plan will be " +
    +                  s"turned to cross join. This plan shows below:\n $j")
    +                Cross
    +              } else {
    +                // if the crossJoinEnabled is false, an AnalysisException 
will throw by
    +                // [[CheckCartesianProducts]], we throw firstly here for 
better readable
    +                // information.
    +                throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
    +                  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
    +                  "join to cross join by setting the configuration 
variable " +
    +                  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
    +              }
    +            } else {
    +              joinType
    +            }
    +
    +          val join = Join(newLeft, newRight, newJoinType, newJoinCond)
    +          if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
    +            Filter(others.reduceLeft(And), join)
    +          } else {
    +            join
    --- End diff --
    
    It means that in the left_semi join the output of the Join operator should 
contain only the attributes from the left side, so attributes from the right 
side should not be referenced after the join. Therefore the plan should be 
invalid. I am a bit surprised that works, it would be great to understand why. 
Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to