Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22326#discussion_r215271726
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
                 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
               val newRight = rightJoinConditions.
                 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
    -          val newJoinCond = commonJoinCondition.reduceLeftOption(And)
    -
    -          Join(newLeft, newRight, joinType, newJoinCond)
    +          val (newJoinConditions, others) =
    +            commonJoinCondition.partition(canEvaluateWithinJoin)
    +          val newJoinCond = newJoinConditions.reduceLeftOption(And)
    +          val newJoinType =
    +            if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
    +              if (SQLConf.get.crossJoinEnabled) {
    +                // if condition expression is unevaluable, it will be 
removed from
    +                // the new join conditions, if all conditions is 
unevaluable, we should
    +                // change the join type to CrossJoin.
    +                logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
    +                  "plan is unevaluable, it will be ignored and the join 
plan will be " +
    +                  s"turned to cross join. This plan shows below:\n $j")
    +                Cross
    +              } else {
    +                // if the crossJoinEnabled is false, an AnalysisException 
will throw by
    +                // [[CheckCartesianProducts]], we throw firstly here for 
better readable
    +                // information.
    +                throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
    +                  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
    +                  "join to cross join by setting the configuration 
variable " +
    +                  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
    +              }
    +            } else {
    +              joinType
    +            }
    +
    +          val join = Join(newLeft, newRight, newJoinType, newJoinCond)
    +          if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
    +            Filter(others.reduceLeft(And), join)
    +          } else {
    +            join
    --- End diff --
    
    ```
     I am a bit surprised that works, it would be great to understand why. 
Thanks.
    ```
    Sorry for the bad test, that's too special and the result just right by 
accident. The original implement will make all semi join return `[]` in PySpark.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to