[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127710
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
-  // push down the single side only join filter for both sides sub 
queries
-  val newLeft = leftJoinConditions.
-reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
-  val newRight = rightJoinConditions.
-reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+case LeftSemi =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
--- End diff --

Could I try to answer this? The projection only used in a left semi join 
after cross join in this scenario for ensuring it only contains left side 
attributes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127673
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
-  // push down the single side only join filter for both sides sub 
queries
-  val newLeft = leftJoinConditions.
-reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
-  val newRight = rightJoinConditions.
-reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+case LeftSemi =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
+  } else {
+join
+  }
+case _: InnerLike =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // only need to add cross join when whole commonJoinCondition 
are unevaluable
+  val newJoinType = if (commonJoinCondition.nonEmpty && 
newJoinCond.isEmpty) {
--- End diff --

Thanks, after a detailed checking, I change this to `others.nonEmpty`, this 
maybe an unnecessary worry about the commonJoin contains both unevaluable and 
evaluable condition. Also add a test in next commit to ensure this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127605
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1149,6 +1149,47 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
+  private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: 
LogicalPlan) = {
+if (SQLConf.get.crossJoinEnabled) {
+  // if condition expression is unevaluable, it will be removed from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  logWarning(s"The whole commonJoinCondition:$commonJoinCondition of 
the join " +
+"plan is unevaluable, it will be ignored and the join plan will be 
" +
+s"turned to cross join. This plan shows below:\n $j")
+  Cross
+} else {
+  // if the crossJoinEnabled is false, an AnalysisException will throw 
by
+  // CheckCartesianProducts, we throw firstly here for better readable 
information.
+  throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+s"$commonJoinCondition of the join plan is unevaluable, we need to 
cast the " +
+"join to cross join by setting the configuration variable " +
+s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+}
+  }
+
+  /**
+   * Generate new left and right child of join by pushing down the side 
only join filter,
+   * split commonJoinCondition based on the expression can be evaluated 
within join or not.
+   *
+   * @return (newLeftChild, newRightChild, newJoinCondition, 
conditionCannotEvaluateWithinJoin)
--- End diff --

Got it, just see the demo here 
https://github.com/apache/spark/pull/22326/files#diff-a636a87d8843eeccca90140be91d4fafR1140,
 remove in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215890040
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
-  // push down the single side only join filter for both sides sub 
queries
-  val newLeft = leftJoinConditions.
-reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
-  val newRight = rightJoinConditions.
-reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+case LeftSemi =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
+  } else {
+join
+  }
+case _: InnerLike =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // only need to add cross join when whole commonJoinCondition 
are unevaluable
+  val newJoinType = if (commonJoinCondition.nonEmpty && 
newJoinCond.isEmpty) {
--- End diff --

mmh...why here we have this check and later for the filter we check 
`others.nonEmpty`? Shouldn't be the same?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215887767
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1149,6 +1149,47 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
+  private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: 
LogicalPlan) = {
+if (SQLConf.get.crossJoinEnabled) {
+  // if condition expression is unevaluable, it will be removed from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  logWarning(s"The whole commonJoinCondition:$commonJoinCondition of 
the join " +
+"plan is unevaluable, it will be ignored and the join plan will be 
" +
+s"turned to cross join. This plan shows below:\n $j")
+  Cross
+} else {
+  // if the crossJoinEnabled is false, an AnalysisException will throw 
by
+  // CheckCartesianProducts, we throw firstly here for better readable 
information.
+  throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+s"$commonJoinCondition of the join plan is unevaluable, we need to 
cast the " +
+"join to cross join by setting the configuration variable " +
+s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+}
+  }
+
+  /**
+   * Generate new left and right child of join by pushing down the side 
only join filter,
+   * split commonJoinCondition based on the expression can be evaluated 
within join or not.
+   *
+   * @return (newLeftChild, newRightChild, newJoinCondition, 
conditionCannotEvaluateWithinJoin)
--- End diff --

nit: this is not very useful, we can see that these are the names 
returned...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215889232
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1243,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
-  // push down the single side only join filter for both sides sub 
queries
-  val newLeft = leftJoinConditions.
-reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
-  val newRight = rightJoinConditions.
-reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+case LeftSemi =>
+  val (newLeft, newRight, newJoinCond, others) = 
getNewChildAndSplitCondForJoin(
+j, leftJoinConditions, rightJoinConditions, 
commonJoinCondition)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
--- End diff --

before the patch we are not doing this pojection. I am not sure why.

cc @hvanhovell @davies who I see worked on this previously.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215877417
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
 
 // CheckAnalysis will ensure nondeterministic expressions not appear 
in join condition.
 // TODO support nondeterministic expressions in join condition.
-comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
-  checkAnalysis = false)
+withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
+  comparePlans(Optimize.execute(originalQuery.analyze), 
correctAnswer.analyze,
+checkAnalysis = false)
+}
+  }
+
+  test("join condition pushdown: deterministic and non-deterministic in 
left semi join") {
--- End diff --

I didn't add SPARK-25314 cause it maybe a supplement for test("join 
condition pushdown: deterministic and non-deterministic").


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215876606
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1222,50 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
+case LeftSemi =>
   // push down the single side only join filter for both sides sub 
queries
   val newLeft = leftJoinConditions.
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
+  } else {
+join
+  }
+case _: InnerLike =>
+  // push down the single side only join filter for both sides sub 
queries
+  val newLeft = leftJoinConditions.
--- End diff --

No problem, done in 87440b0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215876550
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1149,6 +1149,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
+  private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: 
LogicalPlan) = {
+if (SQLConf.get.crossJoinEnabled) {
+  // if condition expression is unevaluable, it will be removed from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  logWarning(s"The whole commonJoinCondition:$commonJoinCondition of 
the join " +
+"plan is unevaluable, it will be ignored and the join plan will be 
" +
+s"turned to cross join. This plan shows below:\n $j")
+  Cross
+} else {
+  // if the crossJoinEnabled is false, an AnalysisException will throw 
by
+  // [[CheckCartesianProducts]], we throw firstly here for better 
readable
--- End diff --

Thanks, done in 87440b0. I'll also pay attention in future work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215862215
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1202,15 +1222,50 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 
split(joinCondition.map(splitConjunctivePredicates).getOrElse(Nil), left, right)
 
   joinType match {
-case _: InnerLike | LeftSemi =>
+case LeftSemi =>
   // push down the single side only join filter for both sides sub 
queries
   val newLeft = leftJoinConditions.
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // need to add cross join when unevaluable condition exists
+  val newJoinType = if (others.nonEmpty) {
+tryToGetCrossType(commonJoinCondition, j)
+  } else {
+joinType
+  }
 
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Project(newLeft.output.map(_.toAttribute), 
Filter(others.reduceLeft(And), join))
+  } else {
+join
+  }
+case _: InnerLike =>
+  // push down the single side only join filter for both sides sub 
queries
+  val newLeft = leftJoinConditions.
--- End diff --

Can we deduplicate the codes here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-07 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215861685
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1149,6 +1149,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
+  private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: 
LogicalPlan) = {
+if (SQLConf.get.crossJoinEnabled) {
+  // if condition expression is unevaluable, it will be removed from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  logWarning(s"The whole commonJoinCondition:$commonJoinCondition of 
the join " +
+"plan is unevaluable, it will be ignored and the join plan will be 
" +
+s"turned to cross join. This plan shows below:\n $j")
+  Cross
+} else {
+  // if the crossJoinEnabled is false, an AnalysisException will throw 
by
+  // [[CheckCartesianProducts]], we throw firstly here for better 
readable
--- End diff --

tiny nit: we don't necessarily `[[..]]` in inlined comments. We can just 
leave it as is or ``` `...` ``` if you feel like you should. Feel free to 
address this with other comments.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215271726
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

```
 I am a bit surprised that works, it would be great to understand why. 
Thanks.
```
Sorry for the bad test, that's too special and the result just right by 
accident. The original implement will make all semi join return `[]` in PySpark.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-05 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r215261610
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

Thanks @mgaido91 and @dilipbiswal !
I fix this in 63fbcce. The mainly problem is semi join in both 
deterministic and non-deterministic condition, filter after semi join will 
fail. Also add more tests both on python and scala side, including semi join, 
inner join and complex scenario described below.
It makes the strategy difficult to read after considering left semi, so in 
63fbcce I split the logic of semi join and inner join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214991991
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

It means that in the left_semi join the output of the Join operator should 
contain only the attributes from the left side, so attributes from the right 
side should not be referenced after the join. Therefore the plan should be 
invalid. I am a bit surprised that works, it would be great to understand why. 
Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214974819
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

Thanks, I'll do more test on the SemiJoin here, but as currently test over 
PySpark, this is not wrong, maybe I misunderstand you two `wrong` means, is 
your `wrong` means correctness or just benchmark regression?

![image](https://user-images.githubusercontent.com/4833765/45043269-4ba20c80-b09f-11e8-84dc-a1f3ff416303.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214972475
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  s"$commonJoinCondition of the join plan is unevaluable, 
we need to cast the " +
+  "join to cross join by setting the configuration 
variable " +
+  s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty && joinType.isInstanceOf[InnerLike]) {
+Filter(others.reduceLeft(And), join)
+  } else {
+join
--- End diff --

this means that we are removing without doing anything the condition when 
we have a SemiJoin. This is wrong. All this logic can be applied only to the 
Inner case. In the other cases, this fix is wrong. Moreover, please add a UT to 
enforce the correctness in the case LeftSemi join, so we can be sure that a 
wrong fix doesn't go in. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214969191
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Filter(others.reduceLeft(And), join)
--- End diff --

Thanks, no need to add extra Filter in LeftSemi case.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214968900
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
--- End diff --

Thanks, done in 82e50d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214968794
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
--- End diff --

Make sense, also change this in `CheckCartesianProducts`. Done in 82e50d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214962083
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

@mgaido91 
> This is, indeed, arguable. I think that letting the user choose is a good 
idea. If the users runs the query and gets an AnalysisException because he/she 
is trying to perform a cartesian product, he/she can decide: ok, I am doing a 
wrong thing, let's change it; or he/she can say, well, one of my 2 tables 
involved contains 10 rows, not a big deal, I want to perform it nonetheless, 
let's set spark.sql.crossJoin.enabled=true and run it.

Sounds reasonable ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214933474
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Filter(others.reduceLeft(And), join)
--- End diff --

as pointed out by @dilipbiswal, this is correct only in the case of 
InnerJoin


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214933787
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
--- End diff --

What about using the conf val in SQLConf?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214933586
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
--- End diff --

missing `s`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214932266
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
--- End diff --

Thanks, done in a86a7d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214931484
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
+  Cross
+} else joinType
--- End diff --

Thanks, done in a86a7d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214857643
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

@dilipbiswal there are cases when "trivial conditions" are removed from a 
join so we make a inner join a cross one for instance. The performance would be 
awful, you're right. The point is that I am not sure that there is a better way 
to achieve this. I mean, since we have no clue what the UDF does, we need to 
compare all the rows from both sides, ie. we need to perform a cartesian 
product.

> Wondering if we should error out or pick a bad plan

This is, indeed, arguable. I think that letting the user choose is a good 
idea. If the users runs the query and gets an `AnalysisException` because 
he/she is trying to perform a cartesian product, he/she can decide: ok, I am 
doing a wrong thing, let's change it; or he/she can say, well, one of my 2 
tables involved contains 10 rows, not a big deal, I want to perform it 
nonetheless, let's set `spark.sql.crossJoin.enabled=true` and run it.

> for join types other than inner and leftsemi, we still have the same 
issue, no ?

I think the current PR handles properly only the case with type inner (for 
the left semi) this PR returns an incorrect result IIUC. This needs to be fixed 
as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214841211
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

```
yes, and if the plan is big, than this would become quite unreadable IMHO. 
I think it would be better to refactor the message and put the plan at the end.
```
@mgaido91 Thanks for your advise, will do the refactor in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214840994
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

@dilipbiswal Thanks for your detailed check, I should write the case more 
typical, here the case we want to solve is UDF which accessing the attribute in 
both side, I'll change the case to `dummyPythonUDF(col("a"), col("c")) === 
dummyPythonUDF(col("d"), col("c"))` in next commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214840118
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

@mgaido91 Thanks. Marco, do you know if there are instances when we pick 
cross join implicitly ? It wouldn't perform very well, right ? Wondering if we 
should error out or pick a bad plan. I guess, i am not sure whats the right 
thing to do here. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214834311
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

@dilipbiswal I haven't checked the particular plan posted in that comment, 
for which I think you are right, we can handle as you suggested, but I was 
checking the case in the UT and in the description of this PR, ie. when the 
input for the Python UDF contains attributes from both sides. In that case I 
don't have a better suggestion.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214828964
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

@xuanyuanking @mgaido91 In the above example, the UDF's refer to attributes 
from distinct legs of the join. Can we not plan this better than a cross join 
in this case ? I am wondering why we can't do -
```
  Join Inner, leftAlias1 = rightAlias1
  Project dummyUDF(a, b) as leftAlias1
 LocalRelation(a, b)
  Project dummyUDF(c, d) as rightAlias1
 LocalRelation(c, d)
```
Perhaps i am missing something .. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214824642
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

yes, and if the plan is big, than this would become quite unreadable IMHO. 
I think it would be better to refactor the message and put the plan at the end.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214823799
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

The log will be shown like this:
```
16:13:35.218 WARN 
org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughJoin: The whole 
commonJoinCondition:List((dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))) of the 
join plan:
 Join Inner, (dummyUDF(a#5, b#6) = dummyUDF(d#15, c#14))
:- LocalRelation [a#5, b#6]
+- LocalRelation [c#14, d#15]
 is unevaluable, it will be ignored and the join plan will be turned to 
cross join.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214823548
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
--- End diff --

Thanks for reminding, crossJoinEnable should be checked here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214818536
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

not sure that inlining the plan here makes this warning very readable...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214818227
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
--- End diff --

I think this should be done only in this case: 
https://github.com/apache/spark/pull/22326#discussion_r214813591


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214818719
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
+  Cross
+} else joinType
--- End diff --

```
} else {
  joinType
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214818432
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
--- End diff --

`s` should be removed


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214813591
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,21 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross 
else joinType
--- End diff --

what about also checking `spark.sql.crossJoin.enabled` and allow the 
transformation only in that case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214807566
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
 ---
@@ -97,6 +100,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest 
with SharedSQLContext {
 }
 assert(qualifiedPlanNodes.size == 1)
   }
+
+  test("Python UDF refers to the attributes from more than one child in 
join condition") {
--- End diff --

Got it, will add in this commit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214807480
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,21 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross 
else joinType
--- End diff --

Make sense, I'll leave a warn here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214807200
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self):
 right = self.spark.createDataFrame([Row(b=1)])
 f = udf(lambda a, b: a == b, BooleanType())
 df = left.crossJoin(right).filter(f("a", "b"))
+
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.crossJoin(right).filter(f("a", "b"))
--- End diff --

ditto, the correct test is `df = left.join(right, f("a", "b"))`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214806996
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self):
 right = self.spark.createDataFrame([Row(b=1)])
 f = udf(lambda a, b: a == b, BooleanType())
 df = left.crossJoin(right).filter(f("a", "b"))
+
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.crossJoin(right).filter(f("a", "b"))
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
--- End diff --

Yep, sorry for the mess here, another commit left on. I'll fix soon, how to 
cancel the test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214805817
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
 ---
@@ -97,6 +100,17 @@ class BatchEvalPythonExecSuite extends SparkPlanTest 
with SharedSQLContext {
 }
 assert(qualifiedPlanNodes.size == 1)
   }
+
+  test("Python UDF refers to the attributes from more than one child in 
join condition") {
--- End diff --

I would add a JIRA prefix here - this change sounds more like fixing a 
particular problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214805785
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self):
 right = self.spark.createDataFrame([Row(b=1)])
 f = udf(lambda a, b: a == b, BooleanType())
 df = left.crossJoin(right).filter(f("a", "b"))
+
+def test_udf_in_join_condition(self):
--- End diff --

Sorry for the mess here, I'm changing.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214805695
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self):
 right = self.spark.createDataFrame([Row(b=1)])
 f = udf(lambda a, b: a == b, BooleanType())
 df = left.crossJoin(right).filter(f("a", "b"))
+
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.crossJoin(right).filter(f("a", "b"))
--- End diff --

BTW, why do we explicitly test cross join here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214805611
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -545,6 +545,15 @@ def test_udf_in_filter_on_top_of_join(self):
 right = self.spark.createDataFrame([Row(b=1)])
 f = udf(lambda a, b: a == b, BooleanType())
 df = left.crossJoin(right).filter(f("a", "b"))
+
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.crossJoin(right).filter(f("a", "b"))
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
--- End diff --

Looks duplicated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214805020
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,21 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) Cross 
else joinType
--- End diff --

I think we should at least warn or leave a note that unevaluable (or Python 
UDF) in the join condition will be ignored and turned to a cross join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
GitHub user xuanyuanking opened a pull request:

https://github.com/apache/spark/pull/22326

[SPARK-25314][SQL] Fix Python UDF accessing attibutes from both side of 
join in join conditions

## What changes were proposed in this pull request?

Thanks for @bahchis reporting this. It is more like a follow up work for 
#16581, this PR fix the scenario of Python UDF accessing attributes from both 
side of join in join condition.

## How was this patch tested?

Add  regression tests in PySpark and `BatchEvalPythonExecSuite`.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xuanyuanking/spark SPARK-25314

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22326


commit 23b10283500b18462a71eb525d4762dd33c3d4fa
Author: Yuanjian Li 
Date:   2018-09-04T06:43:14Z

Fix Python UDF accessing attibutes from both side of join in join conditions




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org