[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127710 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. -reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. -reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) +case LeftSemi => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) --- End diff -- Could I try to answer this? The projection only used in a left semi join after cross join in this scenario for ensuring it only contains left side attributes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127673 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. -reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. -reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) +case LeftSemi => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + } else { +join + } +case _: InnerLike => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // only need to add cross join when whole commonJoinCondition are unevaluable + val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { --- End diff -- Thanks, after a detailed checking, I change this to `others.nonEmpty`, this maybe an unnecessary worry about the commonJoin contains both unevaluable and evaluable condition. Also add a test in next commit to ensure this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r216127605 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1149,6 +1149,47 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { +if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +"plan is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join. This plan shows below:\n $j") + Cross +} else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // CheckCartesianProducts, we throw firstly here for better readable information. + throw new AnalysisException("Detected the whole commonJoinCondition:" + +s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + +"join to cross join by setting the configuration variable " + +s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") +} + } + + /** + * Generate new left and right child of join by pushing down the side only join filter, + * split commonJoinCondition based on the expression can be evaluated within join or not. + * + * @return (newLeftChild, newRightChild, newJoinCondition, conditionCannotEvaluateWithinJoin) --- End diff -- Got it, just see the demo here https://github.com/apache/spark/pull/22326/files#diff-a636a87d8843eeccca90140be91d4fafR1140, remove in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215890040 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. -reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. -reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) +case LeftSemi => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + } else { +join + } +case _: InnerLike => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // only need to add cross join when whole commonJoinCondition are unevaluable + val newJoinType = if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { --- End diff -- mmh...why here we have this check and later for the filter we check `others.nonEmpty`? Shouldn't be the same? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215887767 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1149,6 +1149,47 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { +if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +"plan is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join. This plan shows below:\n $j") + Cross +} else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // CheckCartesianProducts, we throw firstly here for better readable information. + throw new AnalysisException("Detected the whole commonJoinCondition:" + +s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + +"join to cross join by setting the configuration variable " + +s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") +} + } + + /** + * Generate new left and right child of join by pushing down the side only join filter, + * split commonJoinCondition based on the expression can be evaluated within join or not. + * + * @return (newLeftChild, newRightChild, newJoinCondition, conditionCannotEvaluateWithinJoin) --- End diff -- nit: this is not very useful, we can see that these are the names returned... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215889232 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => - // push down the single side only join filter for both sides sub queries - val newLeft = leftJoinConditions. -reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) - val newRight = rightJoinConditions. -reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) +case LeftSemi => + val (newLeft, newRight, newJoinCond, others) = getNewChildAndSplitCondForJoin( +j, leftJoinConditions, rightJoinConditions, commonJoinCondition) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) --- End diff -- before the patch we are not doing this pojection. I am not sure why. cc @hvanhovell @davies who I see worked on this previously. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215877417 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala --- @@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest { "x.a".attr === Rand(10) && "y.b".attr === 5)) val correctAnswer = x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && "y.b".attr === 5), -condition = Some("x.a".attr === Rand(10))) +joinType = Cross).where("x.a".attr === Rand(10)) // CheckAnalysis will ensure nondeterministic expressions not appear in join condition. // TODO support nondeterministic expressions in join condition. -comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, - checkAnalysis = false) +withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze, +checkAnalysis = false) +} + } + + test("join condition pushdown: deterministic and non-deterministic in left semi join") { --- End diff -- I didn't add SPARK-25314 cause it maybe a supplement for test("join condition pushdown: deterministic and non-deterministic"). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215876606 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1222,50 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => +case LeftSemi => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + } else { +join + } +case _: InnerLike => + // push down the single side only join filter for both sides sub queries + val newLeft = leftJoinConditions. --- End diff -- No problem, done in 87440b0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215876550 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1149,6 +1149,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { +if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +"plan is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join. This plan shows below:\n $j") + Cross +} else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // [[CheckCartesianProducts]], we throw firstly here for better readable --- End diff -- Thanks, done in 87440b0. I'll also pay attention in future work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215862215 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1202,15 +1222,50 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right) joinType match { -case _: InnerLike | LeftSemi => +case LeftSemi => // push down the single side only join filter for both sides sub queries val newLeft = leftJoinConditions. reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // need to add cross join when unevaluable condition exists + val newJoinType = if (others.nonEmpty) { +tryToGetCrossType(commonJoinCondition, j) + } else { +joinType + } - Join(newLeft, newRight, joinType, newJoinCond) + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Project(newLeft.output.map(_.toAttribute), Filter(others.reduceLeft(And), join)) + } else { +join + } +case _: InnerLike => + // push down the single side only join filter for both sides sub queries + val newLeft = leftJoinConditions. --- End diff -- Can we deduplicate the codes here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215861685 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1149,6 +1149,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ nonDeterministic) } + private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: LogicalPlan) = { +if (SQLConf.get.crossJoinEnabled) { + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +"plan is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join. This plan shows below:\n $j") + Cross +} else { + // if the crossJoinEnabled is false, an AnalysisException will throw by + // [[CheckCartesianProducts]], we throw firstly here for better readable --- End diff -- tiny nit: we don't necessarily `[[..]]` in inlined comments. We can just leave it as is or ``` `...` ``` if you feel like you should. Feel free to address this with other comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215271726 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- ``` I am a bit surprised that works, it would be great to understand why. Thanks. ``` Sorry for the bad test, that's too special and the result just right by accident. The original implement will make all semi join return `[]` in PySpark. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r215261610 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- Thanks @mgaido91 and @dilipbiswal ! I fix this in 63fbcce. The mainly problem is semi join in both deterministic and non-deterministic condition, filter after semi join will fail. Also add more tests both on python and scala side, including semi join, inner join and complex scenario described below. It makes the strategy difficult to read after considering left semi, so in 63fbcce I split the logic of semi join and inner join. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214991991 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- It means that in the left_semi join the output of the Join operator should contain only the attributes from the left side, so attributes from the right side should not be referenced after the join. Therefore the plan should be invalid. I am a bit surprised that works, it would be great to understand why. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214974819 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- Thanks, I'll do more test on the SemiJoin here, but as currently test over PySpark, this is not wrong, maybe I misunderstand you two `wrong` means, is your `wrong` means correctness or just benchmark regression? ![image](https://user-images.githubusercontent.com/4833765/45043269-4ba20c80-b09f-11e8-84dc-a1f3ff416303.png) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214972475 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + s"$commonJoinCondition of the join plan is unevaluable, we need to cast the " + + "join to cross join by setting the configuration variable " + + s"${SQLConf.CROSS_JOINS_ENABLED.key}=true") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) { +Filter(others.reduceLeft(And), join) + } else { +join --- End diff -- this means that we are removing without doing anything the condition when we have a SemiJoin. This is wrong. All this logic can be applied only to the Inner case. In the other cases, this fix is wrong. Moreover, please add a UT to enforce the correctness in the case LeftSemi join, so we can be sure that a wrong fix doesn't go in. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214969191 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Filter(others.reduceLeft(And), join) --- End diff -- Thanks, no need to add extra Filter in LeftSemi case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214968900 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + --- End diff -- Thanks, done in 82e50d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214968794 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") --- End diff -- Make sense, also change this in `CheckCartesianProducts`. Done in 82e50d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214962083 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- @mgaido91 > This is, indeed, arguable. I think that letting the user choose is a good idea. If the users runs the query and gets an AnalysisException because he/she is trying to perform a cartesian product, he/she can decide: ok, I am doing a wrong thing, let's change it; or he/she can say, well, one of my 2 tables involved contains 10 rows, not a big deal, I want to perform it nonetheless, let's set spark.sql.crossJoin.enabled=true and run it. Sounds reasonable .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214933474 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Filter(others.reduceLeft(And), join) --- End diff -- as pointed out by @dilipbiswal, this is correct only in the case of InnerJoin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214933787 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") --- End diff -- What about using the conf val in SQLConf? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214933586 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + --- End diff -- missing `s` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214932266 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") --- End diff -- Thanks, done in a86a7d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214931484 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") + Cross +} else joinType --- End diff -- Thanks, done in a86a7d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214857643 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- @dilipbiswal there are cases when "trivial conditions" are removed from a join so we make a inner join a cross one for instance. The performance would be awful, you're right. The point is that I am not sure that there is a better way to achieve this. I mean, since we have no clue what the UDF does, we need to compare all the rows from both sides, ie. we need to perform a cartesian product. > Wondering if we should error out or pick a bad plan This is, indeed, arguable. I think that letting the user choose is a good idea. If the users runs the query and gets an `AnalysisException` because he/she is trying to perform a cartesian product, he/she can decide: ok, I am doing a wrong thing, let's change it; or he/she can say, well, one of my 2 tables involved contains 10 rows, not a big deal, I want to perform it nonetheless, let's set `spark.sql.crossJoin.enabled=true` and run it. > for join types other than inner and leftsemi, we still have the same issue, no ? I think the current PR handles properly only the case with type inner (for the left semi) this PR returns an incorrect result IIUC. This needs to be fixed as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214841211 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- ``` yes, and if the plan is big, than this would become quite unreadable IMHO. I think it would be better to refactor the message and put the plan at the end. ``` @mgaido91 Thanks for your advise, will do the refactor in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214840994 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- @dilipbiswal Thanks for your detailed check, I should write the case more typical, here the case we want to solve is UDF which accessing the attribute in both side, I'll change the case to `dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), col("c"))` in next commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214840118 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- @mgaido91 Thanks. Marco, do you know if there are instances when we pick cross join implicitly ? It wouldn't perform very well, right ? Wondering if we should error out or pick a bad plan. I guess, i am not sure whats the right thing to do here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214834311 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- @dilipbiswal I haven't checked the particular plan posted in that comment, for which I think you are right, we can handle as you suggested, but I was checking the case in the UT and in the description of this PR, ie. when the input for the Python UDF contains attributes from both sides. In that case I don't have a better suggestion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214828964 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- @xuanyuanking @mgaido91 In the above example, the UDF's refer to attributes from distinct legs of the join. Can we not plan this better than a cross join in this case ? I am wondering why we can't do - ``` Join Inner, leftAlias1 = rightAlias1 Project dummyUDF(a, b) as leftAlias1 LocalRelation(a, b) Project dummyUDF(c, d) as rightAlias1 LocalRelation(c, d) ``` Perhaps i am missing something .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214824642 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- yes, and if the plan is big, than this would become quite unreadable IMHO. I think it would be better to refactor the message and put the plan at the end. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214823799 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- The log will be shown like this: ``` 16:13:35.218 WARN org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin: The whole commonJoinCondition:List((dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))) of the join plan: Join Inner, (dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14)) :- LocalRelation [a#5, b#6] +- LocalRelation [c#14, d#15] is unevaluable, it will be ignored and the join plan will be turned to cross join. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214823548 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { --- End diff -- Thanks for reminding, crossJoinEnable should be checked here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214818536 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- not sure that inlining the plan here makes this warning very readable... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214818227 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { --- End diff -- I think this should be done only in this case: https://github.com/apache/spark/pull/22326#discussion_r214813591 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214818719 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") + Cross +} else joinType --- End diff -- ``` } else { joinType } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214818432 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") --- End diff -- `s` should be removed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214813591 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,21 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross else joinType --- End diff -- what about also checking `spark.sql.crossJoin.enabled` and allow the transformation only in that case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214807566 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala --- @@ -97,6 +100,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { } assert(qualifiedPlanNodes.size == 1) } + + test("Python UDF refers to the attributes from more than one child in join condition") { --- End diff -- Got it, will add in this commit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214807480 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,21 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross else joinType --- End diff -- Make sense, I'll leave a warn here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214807200 --- Diff: python/pyspark/sql/tests.py --- @@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.crossJoin(right).filter(f("a", "b")) --- End diff -- ditto, the correct test is `df = left.join(right, f("a", "b"))`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214806996 --- Diff: python/pyspark/sql/tests.py --- @@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.crossJoin(right).filter(f("a", "b")) +self.assertEqual(df.collect(), [Row(a=1, b=1)]) self.assertEqual(df.collect(), [Row(a=1, b=1)]) --- End diff -- Yep, sorry for the mess here, another commit left on. I'll fix soon, how to cancel the test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214805817 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala --- @@ -97,6 +100,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext { } assert(qualifiedPlanNodes.size == 1) } + + test("Python UDF refers to the attributes from more than one child in join condition") { --- End diff -- I would add a JIRA prefix here - this change sounds more like fixing a particular problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214805785 --- Diff: python/pyspark/sql/tests.py --- @@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + +def test_udf_in_join_condition(self): --- End diff -- Sorry for the mess here, I'm changing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214805695 --- Diff: python/pyspark/sql/tests.py --- @@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.crossJoin(right).filter(f("a", "b")) --- End diff -- BTW, why do we explicitly test cross join here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214805611 --- Diff: python/pyspark/sql/tests.py --- @@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self): right = self.spark.createDataFrame([Row(b=1)]) f = udf(lambda a, b: a == b, BooleanType()) df = left.crossJoin(right).filter(f("a", "b")) + +def test_udf_in_join_condition(self): +# regression test for SPARK-25314 +from pyspark.sql.functions import udf +left = self.spark.createDataFrame([Row(a=1)]) +right = self.spark.createDataFrame([Row(b=1)]) +f = udf(lambda a, b: a == b, BooleanType()) +df = left.crossJoin(right).filter(f("a", "b")) +self.assertEqual(df.collect(), [Row(a=1, b=1)]) self.assertEqual(df.collect(), [Row(a=1, b=1)]) --- End diff -- Looks duplicated --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214805020 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,21 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross else joinType --- End diff -- I think we should at least warn or leave a note that unevaluable (or Python UDF) in the join condition will be ignored and turned to a cross join. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
GitHub user xuanyuanking opened a pull request: https://github.com/apache/spark/pull/22326 [SPARK-25314][SQL] Fix Python UDF accessing attibutes from both side of join in join conditions ## What changes were proposed in this pull request? Thanks for @bahchis reporting this. It is more like a follow up work for #16581, this PR fix the scenario of Python UDF accessing attributes from both side of join in join condition. ## How was this patch tested? Add regression tests in PySpark and `BatchEvalPythonExecSuite`. You can merge this pull request into a Git repository by running: $ git pull https://github.com/xuanyuanking/spark SPARK-25314 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22326 commit 23b10283500b18462a71eb525d4762dd33c3d4fa Author: Yuanjian Li Date: 2018-09-04T06:43:14Z Fix Python UDF accessing attibutes from both side of join in join conditions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org