Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22326#discussion_r215261610
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
    @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
                 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
               val newRight = rightJoinConditions.
                 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
    -          val newJoinCond = commonJoinCondition.reduceLeftOption(And)
    -
    -          Join(newLeft, newRight, joinType, newJoinCond)
    +          val (newJoinConditions, others) =
    +            commonJoinCondition.partition(canEvaluateWithinJoin)
    +          val newJoinCond = newJoinConditions.reduceLeftOption(And)
    +          val newJoinType =
    +            if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
    +              if (SQLConf.get.crossJoinEnabled) {
    +                // if condition expression is unevaluable, it will be 
removed from
    +                // the new join conditions, if all conditions is 
unevaluable, we should
    +                // change the join type to CrossJoin.
    +                logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
    +                  "plan is unevaluable, it will be ignored and the join 
plan will be " +
    +                  s"turned to cross join. This plan shows below:\n $j")
    +                Cross
    +              } else {
    +                // if the crossJoinEnabled is false, an AnalysisException 
will throw by
    +                // [[CheckCartesianProducts]], we throw firstly here for 
better readable
    +                // information.
    +                throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
    +                  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
    +                  "join to cross join by setting the configuration 
variable " +
    +                  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
    +              }
    +            } else {
    +              joinType
    +            }
    +
    +          val join = Join(newLeft, newRight, newJoinType, newJoinCond)
    +          if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
    +            Filter(others.reduceLeft(And), join)
    +          } else {
    +            join
    --- End diff --
    
    Thanks @mgaido91 and @dilipbiswal !
    I fix this in 63fbcce. The mainly problem is semi join in both 
deterministic and non-deterministic condition, filter after semi join will 
fail. Also add more tests both on python and scala side, including semi join, 
inner join and complex scenario described below.
    It makes the strategy difficult to read after considering left semi, so in 
63fbcce I split the logic of semi join and inner join.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to