[ 
https://issues.apache.org/jira/browse/SPARK-25060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573866#comment-16573866
 ] 

Bryan Cutler commented on SPARK-25060:
--------------------------------------

I believe this was brought up here 
https://lists.apache.org/thread.html/a83e58dceed7b9b277070d42a5c85b319cb3ab77a2e2f00c4a7f3deb@<dev.spark.apache.org>
 cc [~LI,Xiao] 

> PySpark UDF in case statement is always run
> -------------------------------------------
>
>                 Key: SPARK-25060
>                 URL: https://issues.apache.org/jira/browse/SPARK-25060
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.1
>            Reporter: Ryan Blue
>            Priority: Major
>
> When evaluating a case statement with a python UDF, Spark will always run the 
> UDF even if the case doesn't use the branch with the UDF call. Here's a repro 
> case:
> {code:lang=python}
> from pyspark.sql.types import StringType
> def fail_if_x(s):
>     assert s != 'x'
>     return s
> spark.udf.register("fail_if_x", fail_if_x, StringType())
> df = spark.createDataFrame([(1, 'x'), (2, 'y')], ['id', 'str'])
> df.registerTempTable("data")
> spark.sql("select id, case when str <> 'x' then fail_if_x(str) else null end 
> from data").show()
> {code}
> This produces the following error:
> {code}
> Caused by: org.apache.spark.api.python.PythonException: Traceback (most 
> recent call last): 
>   File 
> "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py",
>  line 189, in main 
>     process() 
>   File 
> "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py",
>  line 184, in process 
>     serializer.dump_stream(func(split_index, iterator), outfile) 
>   File 
> "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py",
>  line 104, in <lambda> 
>     func = lambda _, it: map(mapper, it) 
>   File "<string>", line 1, in <lambda> 
>   File 
> "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py",
>  line 71, in <lambda> 
>     return lambda *a: f(*a) 
>   File "<ipython-input-1-91ba29d7e46f>", line 4, in fail_if_x 
> AssertionError
> {code}
> This is because Python UDFs are extracted from expressions and run in the 
> BatchEvalPython node inserted as the child of the expression node:
> {code}
> == Physical Plan ==
> CollectLimit 21
> +- *Project [id#0L, CASE WHEN NOT (str#1 = x) THEN pythonUDF0#14 ELSE null 
> END AS CASE WHEN (NOT (str = x)) THEN fail_if_x(str) ELSE CAST(NULL AS 
> STRING) END#6]
>    +- BatchEvalPython [fail_if_x(str#1)], [id#0L, str#1, pythonUDF0#14]
>       +- Scan ExistingRDD[id#0L,str#1]
> {code}
> This doesn't affect correctness, but the behavior doesn't match the Scala API 
> where case can be used to avoid passing data that will cause a UDF to fail 
> into the UDF.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to