[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r240079120 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- Basically, we want to ensure this rule is running once and only once. In the future, if we have another rule/function that calls Optimizer.this.execute(plan), this rule needs to be fixed again... We have a very strong hidden assumption in the implementation. This looks risky in the long term. The current fix is fine for backporting to 2.4. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r240041688 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- I'm not sure if it is totally ok to skip `Subquery` for all optimizer rules. For `ExtractPythonUDFs` I think it is ok because `ExtractPythonUDFs` is performed after the rules in `RewriteSubquery`. So we can skip `ExtractPythonUDFs` here and extract Python UDF after the subqueries are rewritten into join. But for the rules which perform before `RewriteSubquery`, if we skip it on `Subquery`, we have no chance to do the rules after the subqueries are rewritten into join. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r240026330 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- I think you have a point here. If subquery will be converted to join, why do we need to optimize subquery ahead? Anyway, that's something we need to discuss later. cc @dilipbiswal for the subquery question. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239925749 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- I see. If it's common to skip Subquery in other rules, I guess it's ok to put it in here as well. But it would definitely be helpful to establish some kind of guidance, maybe sth like "All optimizer rule should skip Subquery because OptimizeSubqueries will execute them anyway"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239686156 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- I agree it's a bit confusing, but that's how `Subquery` is designed to work. See how `RemoveRedundantAliases` catches `Subquery`. It's sufficient to make `ExtractPythonUDFs` idempotent, skip `Subquery` is just for double safe, and may have a little bit perf improvement, since this rule will be run less. In general, I think we should skip `Subquery` here. This is why we create `Subquery`: we expect rules that don't want to be executed on subquery to skip it. I'll check more rules and see if they need to skip `Subquery` later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239565253 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala --- @@ -131,8 +131,20 @@ object ExtractPythonUDFs extends Rule[LogicalPlan] with PredicateHelper { expressions.flatMap(collectEvaluableUDFs) } - def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { -case plan: LogicalPlan => extract(plan) + def apply(plan: LogicalPlan): LogicalPlan = plan match { +// SPARK-26293: A subquery will be rewritten into join later, and will go through this rule +// eventually. Here we skip subquery, as Python UDF only needs to be extracted once. +case _: Subquery => plan --- End diff -- Personally I found it a bit confusing when two seeming unrelated things are put together (Subquery and ExtractPythonUDFs). I wonder if it's sufficient to make ExtractPythonUDFs idempotent? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239441136 --- Diff: python/pyspark/sql/tests/test_udf.py --- @@ -23,7 +23,7 @@ from pyspark import SparkContext from pyspark.sql import SparkSession, Column, Row -from pyspark.sql.functions import UserDefinedFunction +from pyspark.sql.functions import UserDefinedFunction, udf --- End diff -- Ah, yea. It's okay and I think it's good timing to clean up while we are here, and while it's broken down into multiple test files now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239430315 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala --- @@ -60,8 +60,12 @@ private class BatchIterator[T](iter: Iterator[T], batchSize: Int) /** * A logical plan that evaluates a [[PythonUDF]]. */ -case class ArrowEvalPython(udfs: Seq[PythonUDF], output: Seq[Attribute], child: LogicalPlan) - extends UnaryNode +case class ArrowEvalPython( +udfs: Seq[PythonUDF], +output: Seq[Attribute], +child: LogicalPlan) extends UnaryNode { + override def producedAttributes: AttributeSet = AttributeSet(output.drop(child.output.length)) --- End diff -- a different but related fix, to make the `missingAttributes` calculated correctly. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/23248#discussion_r239430084 --- Diff: python/pyspark/sql/tests/test_udf.py --- @@ -23,7 +23,7 @@ from pyspark import SparkContext from pyspark.sql import SparkSession, Column, Row -from pyspark.sql.functions import UserDefinedFunction +from pyspark.sql.functions import UserDefinedFunction, udf --- End diff -- add the import here, as a lof of tests use it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #23248: [SPARK-26293][SQL] Cast exception when having pyt...
GitHub user cloud-fan opened a pull request: https://github.com/apache/spark/pull/23248 [SPARK-26293][SQL] Cast exception when having python udf in subquery ## What changes were proposed in this pull request? This is a regression introduced by https://github.com/apache/spark/pull/22104 at Spark 2.4.0. When we have Python UDF in subquery, we will hit an exception ``` Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.AttributeReference cannot be cast to org.apache.spark.sql.catalyst.expressions.PythonUDF at scala.collection.immutable.Stream.map(Stream.scala:414) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:98) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:815) ... ``` https://github.com/apache/spark/pull/22104 turned `ExtractPythonUDFs` from a physical rule to optimizer rule. However, there is a difference between a physical rule and optimizer rule. A physical rule always runs once, an optimizer rule may be applied twice on a query tree even the rule is located in a batch that only runs once. For a subquery, the `OptimizeSubqueries` rule will execute the entire optimizer on the query plan inside subquery. Later on subquery will be turned to joins, and the optimizer rules will be applied to it again. Unfortunately, the `ExtractPythonUDFs` rule is not idempotent. When it's applied twice on a query plan inside subquery, it will produce a malformed plan. It extracts Python UDF from Python exec plans. This PR proposes 2 changes to be double safe: 1. `ExtractPythonUDFs` should skip python exec plans, to make the rule idempotent 2. `ExtractPythonUDFs` should skip subquery ## How was this patch tested? a new test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/cloud-fan/spark python Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23248.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23248 commit 9477fb09b850b981862cb72b0ebdebc5b404a082 Author: Wenchen Fan Date: 2018-12-06T11:16:04Z python udf in subquery --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org