[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-09 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r240079120
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

Basically, we want to ensure this rule is running once and only once. In 
the future, if we have another rule/function that calls 
Optimizer.this.execute(plan), this rule needs to be fixed again... We have a 
very strong hidden assumption in the implementation. This looks risky in the 
long term.

The current fix is fine for backporting to 2.4. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-09 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r240041688
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

I'm not sure if it is totally ok to skip `Subquery` for all optimizer rules.

For `ExtractPythonUDFs` I think it is ok because `ExtractPythonUDFs` is 
performed after the rules in `RewriteSubquery`. So we can skip 
`ExtractPythonUDFs` here and extract Python UDF after the subqueries are 
rewritten into join.

But for the rules which perform before `RewriteSubquery`, if we skip it on 
`Subquery`, we have no chance to do the rules after the subqueries are 
rewritten into join.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-08 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r240026330
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

I think you have a point here. If subquery will be converted to join, why 
do we need to optimize subquery ahead?

Anyway, that's something we need to discuss later. cc @dilipbiswal for the 
subquery question.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-07 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239925749
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

I see. If it's common to skip Subquery in other rules, I guess it's ok to 
put it in here as well. But it would definitely be helpful to establish some 
kind of guidance, maybe sth like "All optimizer rule should skip Subquery 
because OptimizeSubqueries will execute them anyway"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239686156
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

I agree it's a bit confusing, but that's how `Subquery` is designed to 
work. See how `RemoveRedundantAliases` catches `Subquery`.

It's sufficient to make `ExtractPythonUDFs` idempotent, skip `Subquery` is 
just for double safe, and may have a little bit perf improvement, since this 
rule will be run less.

In general, I think we should skip `Subquery` here. This is why we create 
`Subquery`: we expect rules that don't want to be executed on subquery to skip 
it. I'll check more rules and see if they need to skip `Subquery` later.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239565253
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
 ---
@@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] 
with PredicateHelper {
 expressions.flatMap(collectEvaluableUDFs)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case plan: LogicalPlan => extract(plan)
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+// SPARK-26293: A subquery will be rewritten into join later, and will 
go through this rule
+// eventually. Here we skip subquery, as Python UDF only needs to be 
extracted once.
+case _: Subquery => plan
--- End diff --

Personally I found it a bit confusing when two seeming unrelated things are 
put together (Subquery and ExtractPythonUDFs).

I wonder if it's sufficient to make ExtractPythonUDFs idempotent?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239441136
  
--- Diff: python/pyspark/sql/tests/test_udf.py ---
@@ -23,7 +23,7 @@
 
 from pyspark import SparkContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import UserDefinedFunction
+from pyspark.sql.functions import UserDefinedFunction, udf
--- End diff --

Ah, yea. It's okay and I think it's good timing to clean up while we are 
here, and while it's broken down into multiple test files now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239430315
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
 ---
@@ -60,8 +60,12 @@ private class BatchIterator[T](iter: Iterator[T], 
batchSize: Int)
 /**
  * A logical plan that evaluates a [[PythonUDF]].
  */
-case class ArrowEvalPython(udfs: Seq[PythonUDF], output: Seq[Attribute], 
child: LogicalPlan)
-  extends UnaryNode
+case class ArrowEvalPython(
+udfs: Seq[PythonUDF],
+output: Seq[Attribute],
+child: LogicalPlan) extends UnaryNode {
+  override def producedAttributes: AttributeSet = 
AttributeSet(output.drop(child.output.length))
--- End diff --

a different but related fix, to make the `missingAttributes` calculated 
correctly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23248#discussion_r239430084
  
--- Diff: python/pyspark/sql/tests/test_udf.py ---
@@ -23,7 +23,7 @@
 
 from pyspark import SparkContext
 from pyspark.sql import SparkSession, Column, Row
-from pyspark.sql.functions import UserDefinedFunction
+from pyspark.sql.functions import UserDefinedFunction, udf
--- End diff --

add the import here, as a lof of tests use it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...

2018-12-06 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23248

[SPARK-26293][SQL] Cast exception when having python udf in subquery

## What changes were proposed in this pull request?

This is a regression introduced by 
https://github.com/apache/spark/pull/22104 at Spark 2.4.0.

When we have Python UDF in subquery, we will hit an exception
```
Caused by: java.lang.ClassCastException: 
org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to 
org.apache.spark.sql.catalyst.expressions.PythonUDF
at scala.collection.immutable.Stream.map(Stream.scala:414)
at 
org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815)
...
```

https://github.com/apache/spark/pull/22104 turned `ExtractPythonUDFs` from 
a physical rule to optimizer rule. However, there is a difference between a 
physical rule and optimizer rule. A physical rule always runs once, an 
optimizer rule may be applied twice on a query tree even the rule is located in 
a batch that only runs once.

For a subquery, the `OptimizeSubqueries` rule will execute the entire 
optimizer on the query plan inside subquery. Later on subquery will be turned 
to joins, and the optimizer rules will be applied to it again.

Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's 
applied twice on a query plan inside subquery, it will produce a malformed 
plan. It extracts Python UDF from Python exec plans.

This PR proposes 2 changes to be double safe:
1. `ExtractPythonUDFs` should skip python exec plans, to make the rule 
idempotent
2. `ExtractPythonUDFs` should skip subquery

## How was this patch tested?

a new test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark python

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23248.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23248


commit 9477fb09b850b981862cb72b0ebdebc5b404a082
Author: Wenchen Fan 
Date:   2018-12-06T11:16:04Z

python udf in subquery




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org