[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-27 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22326


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220777535
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
 ---
@@ -100,6 +105,29 @@ class BatchEvalPythonExecSuite extends SparkPlanTest 
with SharedSQLContext {
 }
 assert(qualifiedPlanNodes.size == 1)
   }
+
+  test("SPARK-25314: Python UDF refers to the attributes from more than 
one child " +
--- End diff --

Got it, I use this for IDE mock python UDF, will do this in a follow up PR 
with a new test suites in `org.apache.spark.sql.catalyst.optimizer`, revert in 
2b6977d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220777383
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -552,6 +552,96 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.assertRaisesRegexp(AnalysisException, 'Detected implicit 
cartesian product'):
+df.collect()
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.assertRaisesRegexp(AnalysisException, 'Detected implicit 
cartesian product'):
+df.collect()
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
--- End diff --

Make sense, just for checking during implement, delete both in 2b6977d.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220770012
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
 ---
@@ -100,6 +105,29 @@ class BatchEvalPythonExecSuite extends SparkPlanTest 
with SharedSQLContext {
 }
 assert(qualifiedPlanNodes.size == 1)
   }
+
+  test("SPARK-25314: Python UDF refers to the attributes from more than 
one child " +
--- End diff --

This is still an end-to-end test, I don't think we need it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220769336
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -552,6 +552,96 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.assertRaisesRegexp(AnalysisException, 'Detected implicit 
cartesian product'):
+df.collect()
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.assertRaisesRegexp(AnalysisException, 'Detected implicit 
cartesian product'):
+df.collect()
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
--- End diff --

This test (and the corresponding one for left semi join) is not very 
useful. The filter in join condition will be pushed down so this test is 
basically same as the `test_udf_in_join_condition`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220628624
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
 ---
@@ -100,6 +104,28 @@ class BatchEvalPythonExecSuite extends SparkPlanTest 
with SharedSQLContext {
 }
 assert(qualifiedPlanNodes.size == 1)
   }
+
+  test("SPARK-25314: Python UDF refers to the attributes from more than 
one child " +
+"in join condition") {
+def dummyPythonUDFTest(): Unit = {
+  val df = Seq(("Hello", 4)).toDF("a", "b")
+  val df2 = Seq(("Hello", 4)).toDF("c", "d")
+  val joinDF = df.join(df2,
+dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), 
col("c")))
+  val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect {
+case b: BatchEvalPythonExec => b
+  }
+  assert(qualifiedPlanNodes.size == 1)
+}
+// Test without spark.sql.crossJoin.enabled set
+val errMsg = intercept[AnalysisException] {
+  dummyPythonUDFTest()
+}
+assert(errMsg.getMessage.startsWith("Detected implicit cartesian 
product"))
+// Test with spark.sql.crossJoin.enabled=true
+spark.conf.set("spark.sql.crossJoin.enabled", "true")
--- End diff --

Thanks, done in 7f66954.
```
So I'd prefer having one or 2 end-to-end tests and create a new suite 
testing only the rule and the plan transformation, both for having lower 
testing time and finer grained tests checking that the output plan is indeed 
the expected one (not only checking the result of the query).
```
Make sense, will add a plan test for this rule.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220586456
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala
 ---
@@ -100,6 +104,28 @@ class BatchEvalPythonExecSuite extends SparkPlanTest 
with SharedSQLContext {
 }
 assert(qualifiedPlanNodes.size == 1)
   }
+
+  test("SPARK-25314: Python UDF refers to the attributes from more than 
one child " +
+"in join condition") {
+def dummyPythonUDFTest(): Unit = {
+  val df = Seq(("Hello", 4)).toDF("a", "b")
+  val df2 = Seq(("Hello", 4)).toDF("c", "d")
+  val joinDF = df.join(df2,
+dummyPythonUDF(col("a"), col("c")) === dummyPythonUDF(col("d"), 
col("c")))
+  val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect {
+case b: BatchEvalPythonExec => b
+  }
+  assert(qualifiedPlanNodes.size == 1)
+}
+// Test without spark.sql.crossJoin.enabled set
+val errMsg = intercept[AnalysisException] {
+  dummyPythonUDFTest()
+}
+assert(errMsg.getMessage.startsWith("Detected implicit cartesian 
product"))
+// Test with spark.sql.crossJoin.enabled=true
+spark.conf.set("spark.sql.crossJoin.enabled", "true")
--- End diff --

please use `withSQLConf`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220576239
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
+ * down rules), we need to pull them out in this rule too.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // If condition expression contains python udf, it will be moved out 
from
+  // the new join conditions. If join condition has python udf only, 
it will be turned
+  // to cross join and the crossJoinEnable will be checked in 
CheckCartesianProducts.
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+logWarning(s"The join condition:$condition of the join plan 
contains " +
+  "PythonUDF only, it will be moved out and the join plan will be 
turned to cross " +
+  s"join. This plan shows below:\n $j")
+None
+  } else {
+Some(rest.reduceLeft(And))
+  }
+  val newJoin = j.copy(condition = newCondition)
+  joinType match {
+case _: InnerLike => Filter(udf.reduceLeft(And), newJoin)
+case LeftSemi =>
+  Project(
+j.left.output.map(_.toAttribute),
+  Filter(udf.reduceLeft(And), newJoin.copy(joinType = Inner)))
--- End diff --

Thanks, done in d2739af.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220576188
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
+ * down rules), we need to pull them out in this rule too.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // If condition expression contains python udf, it will be moved out 
from
+  // the new join conditions. If join condition has python udf only, 
it will be turned
+  // to cross join and the crossJoinEnable will be checked in 
CheckCartesianProducts.
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+logWarning(s"The join condition:$condition of the join plan 
contains " +
+  "PythonUDF only, it will be moved out and the join plan will be 
turned to cross " +
+  s"join. This plan shows below:\n $j")
--- End diff --

Got it, done in d2739af.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220576115
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
+ * down rules), we need to pull them out in this rule too.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // If condition expression contains python udf, it will be moved out 
from
+  // the new join conditions. If join condition has python udf only, 
it will be turned
+  // to cross join and the crossJoinEnable will be checked in 
CheckCartesianProducts.
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
--- End diff --

Thanks, done in d2739af.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220576062
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
+ * down rules), we need to pull them out in this rule too.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // If condition expression contains python udf, it will be moved out 
from
+  // the new join conditions. If join condition has python udf only, 
it will be turned
--- End diff --

Make sense, duplicate with log. Done in d2739af.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220575840
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
--- End diff --

Thanks, done in d2739af.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220569284
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
+ * down rules), we need to pull them out in this rule too.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // If condition expression contains python udf, it will be moved out 
from
+  // the new join conditions. If join condition has python udf only, 
it will be turned
--- End diff --

I think we don't need here the second sentence, ie. the one startng with 
`If join condition ...`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220570791
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
+ * down rules), we need to pull them out in this rule too.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // If condition expression contains python udf, it will be moved out 
from
+  // the new join conditions. If join condition has python udf only, 
it will be turned
+  // to cross join and the crossJoinEnable will be checked in 
CheckCartesianProducts.
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+logWarning(s"The join condition:$condition of the join plan 
contains " +
+  "PythonUDF only, it will be moved out and the join plan will be 
turned to cross " +
+  s"join. This plan shows below:\n $j")
--- End diff --

can we at least remove the whole plan from the warning? Plans can be pretty 
big...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220568484
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
--- End diff --

nits:
 - `they are`
 - missing space before `(`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220570975
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
+ * down rules), we need to pull them out in this rule too.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // If condition expression contains python udf, it will be moved out 
from
+  // the new join conditions. If join condition has python udf only, 
it will be turned
+  // to cross join and the crossJoinEnable will be checked in 
CheckCartesianProducts.
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+logWarning(s"The join condition:$condition of the join plan 
contains " +
+  "PythonUDF only, it will be moved out and the join plan will be 
turned to cross " +
+  s"join. This plan shows below:\n $j")
+None
+  } else {
+Some(rest.reduceLeft(And))
+  }
+  val newJoin = j.copy(condition = newCondition)
+  joinType match {
+case _: InnerLike => Filter(udf.reduceLeft(And), newJoin)
+case LeftSemi =>
+  Project(
+j.left.output.map(_.toAttribute),
+  Filter(udf.reduceLeft(And), newJoin.copy(joinType = Inner)))
--- End diff --

nit: indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220570021
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,53 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * PythonUDF in join condition can not be evaluated, this rule will detect 
the PythonUDF
+ * and pull them out from join condition. For python udf accessing 
attributes from only one side,
+ * they would be pushed down by operation push down rules. If not(e.g. 
user disables filter push
+ * down rules), we need to pull them out in this rule too.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // If condition expression contains python udf, it will be moved out 
from
+  // the new join conditions. If join condition has python udf only, 
it will be turned
+  // to cross join and the crossJoinEnable will be checked in 
CheckCartesianProducts.
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
--- End diff --

nit: -> `splitConjunctivePredicates(condition.get).partition(...)` seems 
more clear to me


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220568332
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
--- End diff --

Thanks, as our discussion in 
https://github.com/apache/spark/pull/22326/files#r220518094.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220567623
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
+"PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
+s"join when its the only condition. This plan shows below:\n $j")
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+Option.empty
+  } else {
+Some(rest.reduceLeft(And))
+  }
+  val newJoin = j.copy(condition = newCondition)
+  joinType match {
+case _: InnerLike =>
+  Filter(udf.reduceLeft(And), newJoin)
+case LeftSemi =>
+  Project(
--- End diff --

Got it, thanks :)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220567381
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
--- End diff --

Thanks, done in 87f0f50.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220567301
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
+"PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
+s"join when its the only condition. This plan shows below:\n $j")
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+Option.empty
+  } else {
+Some(rest.reduceLeft(And))
+  }
+  val newJoin = j.copy(condition = newCondition)
+  joinType match {
+case _: InnerLike =>
+  Filter(udf.reduceLeft(And), newJoin)
--- End diff --

Thanks, done in 87f0f50.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220567216
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
+"PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
+s"join when its the only condition. This plan shows below:\n $j")
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+Option.empty
--- End diff --

Thanks, done in 87f0f50.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220564636
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
+"PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
+s"join when its the only condition. This plan shows below:\n $j")
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+Option.empty
+  } else {
+Some(rest.reduceLeft(And))
+  }
+  val newJoin = j.copy(condition = newCondition)
+  joinType match {
+case _: InnerLike =>
+  Filter(udf.reduceLeft(And), newJoin)
+case LeftSemi =>
+  Project(
--- End diff --

ah, let's leave left anti join then, thanks for trying!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220562279
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
+"PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
+s"join when its the only condition. This plan shows below:\n $j")
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+Option.empty
+  } else {
+Some(rest.reduceLeft(And))
+  }
+  val newJoin = j.copy(condition = newCondition)
+  joinType match {
+case _: InnerLike =>
+  Filter(udf.reduceLeft(And), newJoin)
+case LeftSemi =>
+  Project(
--- End diff --

I tried two ways to implement LeftAnti here:
1. Use the Except(join.left, left semi result, isAll=false) to simulate, it 
is banned by strategy and actually also no plan for 
Except.https://github.com/apache/spark/blob/89671a27e783d77d4bfaec3d422cc8dd468ef04c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala#L557-L559
2. Also use cross join and filter to simulate, but maybe it can't reached 
when there's only udf in anti join condition. Because after cross join, it's 
hard to roll back to original status. UDF+ normal common condition can be 
simulated by
```
Project(
j.left.output.map(_.toAttribute),
Filter(Not(udf.reduceLeft(And)),
newJoin.copy(joinType = Inner, condition = not(rest.reduceLeft(And)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220526661
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
--- End diff --

No problem, I'll change it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220525992
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
--- End diff --

Then shall we make it better? e.g. only log warning if it really becomes a 
cross join, i.e. the join condition is none.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220524866
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
+"PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
+s"join when its the only condition. This plan shows below:\n $j")
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+Option.empty
+  } else {
+Some(rest.reduceLeft(And))
+  }
+  val newJoin = j.copy(condition = newCondition)
+  joinType match {
+case _: InnerLike =>
+  Filter(udf.reduceLeft(And), newJoin)
+case LeftSemi =>
+  Project(
--- End diff --

Let me try.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220524840
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
--- End diff --

Let me try.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220523238
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
--- End diff --

Can we keep this? As we discuss before, there's a little strange for user 
to get a implicit cartesian product exception during python udf in join 
condition, maybe left this log can give some clue. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220522504
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
--- End diff --

```
can we then update the comment of this class in order to reflect what it 
actually does?
```
I'll update the comment soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220518094
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
+"PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
+s"join when its the only condition. This plan shows below:\n $j")
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+Option.empty
+  } else {
+Some(rest.reduceLeft(And))
+  }
+  val newJoin = j.copy(condition = newCondition)
+  joinType match {
+case _: InnerLike =>
+  Filter(udf.reduceLeft(And), newJoin)
+case LeftSemi =>
+  Project(
--- End diff --

so we are simulating a left semi join here. Seems we can do the same thing 
for left anti join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220518059
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
--- End diff --

thanks for the explanation @cloud-fan , makes sense, can we then update the 
comment of this class in order to reflect what it actually does?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220517587
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
--- End diff --

do we really need this warning? If it becomes cross join, users will get an 
error anyway, if cross join is disabled.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220517383
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
--- End diff --

the comment is outdated


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220516732
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
--- End diff --

This doesn't matter. We can't evaluate python udf in the join condition, 
and need to pull it out, that's all.

For python udf accessing attributes from only one side, these would be 
pushed down by other rules. If they don't (e.g. user disables filter pushdown 
rule), we need to pull them out here, too. Anyway it's orthogonal to this rule.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220482709
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
--- End diff --

I see no check that this requires attributes from both sides, shall we add 
it? I see that if this is not true the predicate should have been already 
pushed down, but an additional sanity check is worth IMHO


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220482278
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
+"PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
+s"join when its the only condition. This plan shows below:\n $j")
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+Option.empty
+  } else {
+Some(rest.reduceLeft(And))
+  }
+  val newJoin = j.copy(condition = newCondition)
+  joinType match {
+case _: InnerLike =>
+  Filter(udf.reduceLeft(And), newJoin)
--- End diff --

nit: this can go on the line above


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220481529
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  // if condition expression contains python udf, it will be moved out 
from
+  // the new join conditions, and the join type will be changed to 
CrossJoin.
+  logWarning(s"The join condition:$condition of the join plan contains 
" +
+"PythonUDF, it will be moved out and the join plan will be turned 
to cross " +
+s"join when its the only condition. This plan shows below:\n $j")
+  val (udf, rest) =
+
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+  val newCondition = if (rest.isEmpty) {
+Option.empty
--- End diff --

`None`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220484279
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,51 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object PullOutPythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+if condition.isDefined && hasPythonUDF(condition.get) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
--- End diff --

can't we support `LeftAnti` too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220479127
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
--- End diff --

```
@xuanyuanking Oh.. is it because the UDFS referring to single leg would 
have been pushed down and we will only have UDFs referring to both legs in the 
join condition when we come here ?
```
Yea that's right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220478633
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
--- End diff --

@mgaido91 Yea, I revert the changes for `CheckCartesianProducts` and delete 
the check in new rule in d1db33a.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220463070
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
--- End diff --

@mgaido91 Its probably because this suite only exercises  one rule of the 
optimizer ? :-)
```
 object Optimize extends RuleExecutor[LogicalPlan] {
val batches = Batch("Check Cartesian Products", Once, 
CheckCartesianProducts) :: Nil
  }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220453684
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
--- End diff --

>  the udf check in CheckCartesianProducts is not work because we have 
pulled out the udf in join condition.

yes, but the point is exactly that we don't need that check if we just do 
the change here.

> It will also break the UT add in BatchEvalPythonExecSuite.

This is interesting, why?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220440612
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
--- End diff --

@cloud-fan Just saw your comment that validated my understanding on this 
rule relying on pushing down predicates through join. However, the 
pushdownPredicateThroughJoin is not in the nonExcludableRules list. So can we 
rely on this rule being fired always here ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220438880
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
--- End diff --

@xuanyuanking Oh..  it is because the UDFS referring to single leg would 
have been pushed down and we will only have UDFs referring to both legs in the 
join condition when we come here ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220437721
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1308,6 +1312,16 @@ object CheckCartesianProducts extends 
Rule[LogicalPlan] with PredicateHelper {
 if (SQLConf.get.crossJoinEnabled) {
   plan
 } else plan transform {
+  case j @ Join(_, _, _, condition)
+  if condition.isDefined && 
PullOutPythonUDFInJoinCondition.hasPythonUDF(condition.get) =>
+// if the crossJoinEnabled is false, a RuntimeException will be 
thrown later while
+// the PythonUDF need to access both side of join, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException(s"Detected the join 
condition:${j.condition} of this join " +
--- End diff --

The reason for can't be pulled out in the case 
https://github.com/apache/spark/pull/22326#discussion_r220418201 is not because 
of join type problem. The screenshot is I do a test to do the check by 
`havePythonUDFInAllConditions`, and the test result shows it returned by 
getting a RuntimeException even though its a inner join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220436983
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
--- End diff --

@xuanyuanking So here we are finding out if the join condition has a python 
UDF. I am trying to understand where we are making the determination that this 
python UDF is referring to attributes of both legs of the join ? Can you please 
let me know. Thank you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220436485
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1308,6 +1312,16 @@ object CheckCartesianProducts extends 
Rule[LogicalPlan] with PredicateHelper {
 if (SQLConf.get.crossJoinEnabled) {
   plan
 } else plan transform {
+  case j @ Join(_, _, _, condition)
+  if condition.isDefined && 
PullOutPythonUDFInJoinCondition.hasPythonUDF(condition.get) =>
+// if the crossJoinEnabled is false, a RuntimeException will be 
thrown later while
+// the PythonUDF need to access both side of join, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException(s"Detected the join 
condition:${j.condition} of this join " +
--- End diff --

when will we hit it? If the python udf can't be pulled out because of join 
type problems, we already throw exception at `PullOutPythonUDFInJoinCondition`. 
Did I miss something?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220433980
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
--- End diff --

Maybe Wenchen means PullOutPythonUDFInJoinCondition, done in 98cd3cc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220433916
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression contains python udf, it will be moved 
out from
+// the new join conditions, and the join type will be changed to 
CrossJoin.
+logWarning(s"The join condition:$condition of the join plan 
contains " +
+  "PythonUDF, it will be moved out and the join plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+val (udf, rest) =
+  
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+val newCondition = if (rest.isEmpty) {
+  Option.empty
+} else {
+  Some(rest.reduceLeft(And))
+}
+val newJoin = j.copy(joinType = Cross, condition = newCondition)
+joinType match {
+  case _: InnerLike =>
+Filter(udf.reduceLeft(And), newJoin)
+  case LeftSemi =>
+Project(
--- End diff --

Yes, that's right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220433940
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
--- End diff --

ah yes, thanks, done in 98cd3cc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220433995
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -165,6 +165,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 Batch("LocalRelation", fixedPoint,
   ConvertToLocalRelation,
   PropagateEmptyRelation) :+
+Batch("Extract PythonUDF From JoinCondition", Once,
+HandlePythonUDFInJoinCondition) :+
 // The following batch should be executed after batch "Join Reorder" 
and "LocalRelation".
--- End diff --

Thanks, done in 98cd3cc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220433896
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -552,6 +552,92 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
--- End diff --

Sorry for the old comment, done in 98cd3cc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220433190
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
--- End diff --

It will also break the UT add in `BatchEvalPythonExecSuite`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220432728
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
--- End diff --

```
What about just doing the change?
```
If just do the change here, the udf check in `CheckCartesianProducts` is 
not work because we have pulled out the udf in join condition.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220432468
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1304,10 +1307,27 @@ object CheckCartesianProducts extends 
Rule[LogicalPlan] with PredicateHelper {
 }
   }
 
+  /**
+   * Check if a join contains PythonUDF in join condition.
+   */
+  def hasPythonUDFInJoinCondition(join: Join): Boolean = {
+val conditions = 
join.condition.map(splitConjunctivePredicates).getOrElse(Nil)
+conditions.exists(HandlePythonUDFInJoinCondition.hasPythonUDF)
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan =
 if (SQLConf.get.crossJoinEnabled) {
   plan
 } else plan transform {
+  case j @ Join(_, _, _, _) if hasPythonUDFInJoinCondition(j) =>
--- End diff --

I mean as our discussion in 
https://github.com/apache/spark/pull/22326#discussion_r220198104, 
`HandlePythonUDFInJoinCondition` no longer throw the AnalysisException. If I do 
the check work by `havePythonUDFInAllConditions`, the above case will throw an 
RuntimeException.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220428377
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -552,6 +552,92 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2])
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, 
b1=1, b2=2)])
+
+def test_udf_and_filter_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], 
"left_semi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_common_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and common filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, 
b1=3, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
--- End diff --

same question as above ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220428290
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -552,6 +552,92 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
--- End diff --

is this udf non-deterministic ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220428332
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -552,6 +552,92 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2])
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, 
b1=1, b2=2)])
+
+def test_udf_and_filter_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
--- End diff --

is this udf non-deterministic ?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220424663
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1304,10 +1307,27 @@ object CheckCartesianProducts extends 
Rule[LogicalPlan] with PredicateHelper {
 }
   }
 
+  /**
+   * Check if a join contains PythonUDF in join condition.
+   */
+  def hasPythonUDFInJoinCondition(join: Join): Boolean = {
+val conditions = 
join.condition.map(splitConjunctivePredicates).getOrElse(Nil)
+conditions.exists(HandlePythonUDFInJoinCondition.hasPythonUDF)
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan =
 if (SQLConf.get.crossJoinEnabled) {
   plan
 } else plan transform {
+  case j @ Join(_, _, _, _) if hasPythonUDFInJoinCondition(j) =>
--- End diff --

I don't get it. The error means we didn't pull out python udf, but we 
should already throw exception in `HandlePythonUDFInJoinCondition`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220418201
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1304,10 +1307,27 @@ object CheckCartesianProducts extends 
Rule[LogicalPlan] with PredicateHelper {
 }
   }
 
+  /**
+   * Check if a join contains PythonUDF in join condition.
+   */
+  def hasPythonUDFInJoinCondition(join: Join): Boolean = {
+val conditions = 
join.condition.map(splitConjunctivePredicates).getOrElse(Nil)
+conditions.exists(HandlePythonUDFInJoinCondition.hasPythonUDF)
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan =
 if (SQLConf.get.crossJoinEnabled) {
   plan
 } else plan transform {
+  case j @ Join(_, _, _, _) if hasPythonUDFInJoinCondition(j) =>
--- End diff --

Maybe not, we should keep the current logic, as the test below:

![image](https://user-images.githubusercontent.com/4833765/46055860-866c0180-c180-11e8-94e4-1f86af04b42a.png)
In the join condition, only one python udf but we still need this 
AnalysisException. If the logic here change to `havePythonUDFInAllConditions`, 
you'll get a runtime exception of `requires attributes from more than one 
child.` like:

![image](https://user-images.githubusercontent.com/4833765/46055852-7d7b3000-c180-11e8-867c-f522ca175920.png)



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220289897
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression contains python udf, it will be moved 
out from
+// the new join conditions, and the join type will be changed to 
CrossJoin.
+logWarning(s"The join condition:$condition of the join plan 
contains " +
+  "PythonUDF, it will be moved out and the join plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+val (udf, rest) =
+  
condition.map(splitConjunctivePredicates).get.partition(hasPythonUDF)
+val newCondition = if (rest.isEmpty) {
+  Option.empty
+} else {
+  Some(rest.reduceLeft(And))
+}
+val newJoin = j.copy(joinType = Cross, condition = newCondition)
+joinType match {
+  case _: InnerLike =>
+Filter(udf.reduceLeft(And), newJoin)
+  case LeftSemi =>
+Project(
--- End diff --

we don't need to add the project if we don't change the join type, right?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220289293
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220289231
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
--- End diff --

why not just `if hasPythonUDF(condition)`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220288991
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
--- End diff --

nit: PullOutputPythonUDFInJoinCondition


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220273079
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,56 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  def hasPythonUDF(expression: Expression): Boolean = {
+expression.collectFirst { case udf: PythonUDF => udf }.isDefined
+  }
+
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if 
condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(hasPythonUDF) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
--- End diff --

why are you doing this? What about just doing the change? if the result is 
a cartesian product, then it will be detected by the relative check


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220272571
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1304,10 +1307,27 @@ object CheckCartesianProducts extends 
Rule[LogicalPlan] with PredicateHelper {
 }
   }
 
+  /**
+   * Check if a join contains PythonUDF in join condition.
+   */
+  def hasPythonUDFInJoinCondition(join: Join): Boolean = {
+val conditions = 
join.condition.map(splitConjunctivePredicates).getOrElse(Nil)
+conditions.exists(HandlePythonUDFInJoinCondition.hasPythonUDF)
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan =
 if (SQLConf.get.crossJoinEnabled) {
   plan
 } else plan transform {
+  case j @ Join(_, _, _, _) if hasPythonUDFInJoinCondition(j) =>
--- End diff --

if there are other conditions, the presence of a pythoUDF doesn't imply a 
cartesian product. I think this should be a `havePythonUDFInAllConditions`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220272683
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -165,6 +165,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
 Batch("LocalRelation", fixedPoint,
   ConvertToLocalRelation,
   PropagateEmptyRelation) :+
+Batch("Extract PythonUDF From JoinCondition", Once,
+HandlePythonUDFInJoinCondition) :+
 // The following batch should be executed after batch "Join Reorder" 
and "LocalRelation".
--- End diff --

please update this comment too with the new rule


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220267389
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1304,10 +1307,27 @@ object CheckCartesianProducts extends 
Rule[LogicalPlan] with PredicateHelper {
 }
   }
 
+  /**
+   * Check if a join contains PythonUDF in join condition.
+   */
+  def hasPythonUDFInJoinCondition(join: Join): Boolean = {
+val conditions = 
join.condition.map(splitConjunctivePredicates).getOrElse(Nil)
+conditions.exists(HandlePythonUDFInJoinCondition.hasPythonUDF)
+  }
+
   def apply(plan: LogicalPlan): LogicalPlan =
 if (SQLConf.get.crossJoinEnabled) {
   plan
 } else plan transform {
+  case j @ Join(_, _, _, _) if hasPythonUDFInJoinCondition(j) =>
--- End diff --

Move the check logic only in `CheckCartesianProducts ` as our discussion in 
https://github.com/apache/spark/pull/22326#discussion_r220236374 and add 
corresponding UT in CheckCartesianProductsSuite, the checking order is 
PythonUDF first and then the original checking for cartesian products.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220236374
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,60 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(
+_.collectFirst { case udf: PythonUDF => udf }.isDefined) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression contains python udf, it will be moved 
out from
+// the new join conditions, and the join type will be changed to 
CrossJoin.
+logWarning(s"The join condition:$condition of the join plan 
contains " +
+  "PythonUDF, it will be moved out and the join plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+val (udf, rest) =
+  condition.map(splitConjunctivePredicates).get.partition(
+_.collectFirst { case udf: PythonUDF => udf }.isDefined)
+val newCondition = if (rest.isEmpty) {
+  Option.empty
+} else {
+  Some(rest.reduceLeft(And))
+}
+val newJoin = j.copy(joinType = Cross, condition = newCondition)
+joinType match {
+  case _: InnerLike =>
+Filter(udf.reduceLeft(And), newJoin)
+  case LeftSemi =>
+Project(
+  j.left.output.map(_.toAttribute), 
Filter(udf.reduceLeft(And), newJoin))
+  case _ =>
+throw new AnalysisException("Using PythonUDF in join condition 
of join type" +
+  s" $joinType is not supported.")
+}
+  } else {
+// if the crossJoinEnabled is false, a RuntimeException will be 
thrown later while
+// the PythonUDF need to access both side of join, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException(s"Detected the join 
condition:$condition of this join " +
--- End diff --

Got it, I'll move the cross join detection logic only into 
`CheckCartesianProducts` for safety.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220198104
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala 
---
@@ -152,3 +153,60 @@ object EliminateOuterJoin extends Rule[LogicalPlan] 
with PredicateHelper {
   if (j.joinType == newJoinType) f else Filter(condition, 
j.copy(joinType = newJoinType))
   }
 }
+
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+case j @ Join(_, _, joinType, condition)
+  if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(
+_.collectFirst { case udf: PythonUDF => udf }.isDefined) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because for other type,
+// it breaks SQL semantic if we run the join condition as a filter 
after join. If we pass
+// the plan here, it'll still get a an invalid PythonUDF 
RuntimeException with message
+// `requires attributes from more than one child`, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression contains python udf, it will be moved 
out from
+// the new join conditions, and the join type will be changed to 
CrossJoin.
+logWarning(s"The join condition:$condition of the join plan 
contains " +
+  "PythonUDF, it will be moved out and the join plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+val (udf, rest) =
+  condition.map(splitConjunctivePredicates).get.partition(
+_.collectFirst { case udf: PythonUDF => udf }.isDefined)
+val newCondition = if (rest.isEmpty) {
+  Option.empty
+} else {
+  Some(rest.reduceLeft(And))
+}
+val newJoin = j.copy(joinType = Cross, condition = newCondition)
+joinType match {
+  case _: InnerLike =>
+Filter(udf.reduceLeft(And), newJoin)
+  case LeftSemi =>
+Project(
+  j.left.output.map(_.toAttribute), 
Filter(udf.reduceLeft(And), newJoin))
+  case _ =>
+throw new AnalysisException("Using PythonUDF in join condition 
of join type" +
+  s" $joinType is not supported.")
+}
+  } else {
+// if the crossJoinEnabled is false, a RuntimeException will be 
thrown later while
+// the PythonUDF need to access both side of join, we throw 
firstly here for better
+// readable information.
+throw new AnalysisException(s"Detected the join 
condition:$condition of this join " +
--- End diff --

I agree the error message is better than just saying cross join is 
detected. But I'm worried about duplicating the cross join detection logic in 
multiple rules. e.g. what if after pulling out python udf from join condition, 
we still have other join conditions so it's not cross join?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220128279
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1234,6 +1237,59 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUp {
+case j @ Join(_, _, joinType, condition)
+  if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(
+_.collectFirst { case udf: PythonUDF => udf }.isDefined) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because other type
+// can not simply be resolved by adding a Cross join. If we pass 
the plan here, it'll
+// still get a an invalid PythonUDF RuntimeException with message 
`requires attributes
+// from more than one child`, we throw firstly here for better 
readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
--- End diff --

Maybe the currently `CheckCartesianProducts` could not reuse because it 
only match the case of `Join(left, right, Inner | LeftOuter | RightOuter | 
FullOuter, _)`, while moving new batch before it, here will got a CrossJoin.
If it is permitted adding the python udf check log in 
`CheckCartesianProducts` I think your proposal can be achieved, but maybe the 
current logic is better than above method? Because  we can log the detail why 
we need cross join here. WDYT?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220111919
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1234,6 +1237,59 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUp {
+case j @ Join(_, _, joinType, condition)
+  if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(
+_.collectFirst { case udf: PythonUDF => udf }.isDefined) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because other type
+// can not simply be resolved by adding a Cross join. If we pass 
the plan here, it'll
--- End diff --

Yes, will modify the comment more accurate.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220110490
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1234,6 +1237,59 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUp {
+case j @ Join(_, _, joinType, condition)
+  if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(
+_.collectFirst { case udf: PythonUDF => udf }.isDefined) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because other type
+// can not simply be resolved by adding a Cross join. If we pass 
the plan here, it'll
+// still get a an invalid PythonUDF RuntimeException with message 
`requires attributes
+// from more than one child`, we throw firstly here for better 
readable information.
+throw new AnalysisException("Using PythonUDF in join condition of 
join type" +
+  s" $joinType is not supported.")
+  }
+  if (SQLConf.get.crossJoinEnabled) {
--- End diff --

Can we simplify it if we move the new batch before `Check Cartesian 
Products`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220109700
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1234,6 +1237,59 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsUp {
+case j @ Join(_, _, joinType, condition)
+  if condition.map(splitConjunctivePredicates).getOrElse(Nil).exists(
+_.collectFirst { case udf: PythonUDF => udf }.isDefined) =>
+  if (!joinType.isInstanceOf[InnerLike] && joinType != LeftSemi) {
+// The current strategy only support InnerLike and LeftSemi join 
because other type
+// can not simply be resolved by adding a Cross join. If we pass 
the plan here, it'll
--- End diff --

it's not about cross join. It breaks SQL semantic if we run the join 
condition as a filter after join, for non-inner joins. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220109689
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1234,6 +1237,59 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
--- End diff --

No problem, will do this together with rebasing work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220108380
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1234,6 +1237,59 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
   }
 }
 
+/**
+ * Correctly handle PythonUDF which need access both side of join side by 
changing the new join
+ * type to Cross.
+ */
+object HandlePythonUDFInJoinCondition extends Rule[LogicalPlan] with 
PredicateHelper {
--- End diff --

can we move it to `org/apache/spark/sql/catalyst/optimizer/joins.scala`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-25 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r220102842
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -547,6 +547,92 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2])
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, 
b1=1, b2=2)])
+
+def test_udf_and_filter_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], 
"left_semi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_common_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and common filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, 
b1=3, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b"), left.a1 == right.b1])
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=1, 
b1=1, b2=1)])
+
+def test_udf_and_common_filter_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and common filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=1, 
b1=3, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b"), left.a1 == right.b1], 
"left_semi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_not_supported_in_join_condition(self):
--- End diff --

Add this test for 
https://github.com/apache/spark/pull/22326/files#diff-a636a87d8843eeccca90140be91d4fafR1249.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-22 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r219675105
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -995,7 +995,8 @@ class Dataset[T] private[sql](
 // After the cloning, left and right side will have distinct 
expression ids.
 val plan = withPlan(
   Join(logicalPlan, right.logicalPlan, JoinType(joinType), 
Some(joinExprs.expr)))
-  .queryExecution.analyzed.asInstanceOf[Join]
+  .queryExecution.analyzed
+val joinPlan = plan.collectFirst { case j: Join => j }.get
--- End diff --

For reviewer, we need this change cause the rule 
`HandlePythonUDFInJoinCondition` will break the assumption about the join plan 
after analyzing will only return Join. After we add the rule of handling python 
udf, we'll add filter or project node on top of Join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216606555
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

Thanks @mgaido91 for the detailed review and advise, for me, I maybe choose 
only limited the change scope to pythonUDF only or at lease Unevaluable only. 
Waiting for others advice.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216592676
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

cc @cloud-fan @gatorsmile for advice on this. It may probably be ok, as it 
lets supporting a case which was not supported before. But I am not sure about 
the added value as performing a cross join is often an impossible operation.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216585000
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

As the code in canEvaluateWithinJoin, we can get the scope relation : 
(CannotEvaluateWithinJoin = nonDeterminstic +  Unevaluable) > Unevaluable > 
PythonUDF. 
So for the safety maybe I just limit the change scope to the smallest 
PythonUDF only. Need some advise from you thanks :)



https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L104-L120


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-11 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216583509
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

Yes, I changed this to let the test passing. The original thought is 
nondeterministic expression in join condition is not supported yet, so that's 
no big 
problem.https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala#L105

https://github.com/apache/spark/blob/0736e72a66735664b191fc363f54e3c522697dba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala#L1158-L1159
But now I think I should more carefully about this and just limit the cross 
join changes only in PythonUDF case. WDYT? @mgaido91 .Thanks.



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216151151
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1149,6 +1149,51 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
+  private def tryToGetCrossType(commonJoinCondition: Seq[Expression], j: 
LogicalPlan) = {
+if (SQLConf.get.crossJoinEnabled) {
+  // if condition expression is unevaluable, it will be removed from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  logWarning(s"The whole commonJoinCondition:$commonJoinCondition of 
the join " +
+"plan is unevaluable, it will be ignored and the join plan will be 
" +
+s"turned to cross join. This plan shows below:\n $j")
+  Cross
+} else {
+  // if the crossJoinEnabled is false, an AnalysisException will throw 
by
+  // CheckCartesianProducts, we throw firstly here for better readable 
information.
+  throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+s"$commonJoinCondition of the join plan is unevaluable, we need to 
cast the " +
+"join to cross join by setting the configuration variable " +
+s"${SQLConf.CROSS_JOINS_ENABLED.key}=true")
+}
+  }
+
+  /**
+   * Generate new join by pushing down the side only join filter, split 
commonJoinCondition
--- End diff --

nit: `filters`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-09 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216151223
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 ---
@@ -1153,12 +1154,35 @@ class FilterPushdownSuite extends PlanTest {
   "x.a".attr === Rand(10) && "y.b".attr === 5))
 val correctAnswer =
   x.where("x.a".attr === 5).join(y.where("y.a".attr === 5 && 
"y.b".attr === 5),
-condition = Some("x.a".attr === Rand(10)))
+joinType = Cross).where("x.a".attr === Rand(10))
--- End diff --

this is not a change we want, right? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attri...

2018-09-08 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r216127932
  
--- Diff: python/pyspark/sql/tests.py ---
@@ -547,6 +547,74 @@ def test_udf_in_filter_on_top_of_join(self):
 df = left.crossJoin(right).filter(f("a", "b"))
 self.assertEqual(df.collect(), [Row(a=1, b=1)])
 
+def test_udf_in_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1)])
+right = self.spark.createDataFrame([Row(b=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"))
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, b=1)])
+
+def test_udf_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, f("a", "b"), "leftsemi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_filter_in_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2])
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1, b=2, 
b1=1, b2=2)])
+
+def test_udf_and_filter_in_left_semi_join_condition(self):
+# regression test for SPARK-25314
+# test the complex scenario with both udf(non-deterministic)
+# and normal filter(deterministic)
+from pyspark.sql.functions import udf
+left = self.spark.createDataFrame([Row(a=1, a1=1, a2=1), Row(a=2, 
a1=2, a2=2)])
+right = self.spark.createDataFrame([Row(b=1, b1=1, b2=1), Row(b=2, 
b1=1, b2=2)])
+f = udf(lambda a, b: a == b, BooleanType())
+df = left.join(right, [f("a", "b1"), left.a == 1, right.b == 2], 
"left_semi")
+with self.sql_conf({"spark.sql.crossJoin.enabled": True}):
+self.assertEqual(df.collect(), [Row(a=1, a1=1, a2=1)])
+
+def test_udf_and_common_filter_in_join_condition(self):
--- End diff --

Add these two test for the comment in 
https://github.com/apache/spark/pull/22326#discussion_r216127673.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org