[ https://issues.apache.org/jira/browse/SPARK-25060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573866#comment-16573866 ]
Bryan Cutler commented on SPARK-25060: -------------------------------------- I believe this was brought up here https://lists.apache.org/thread.html/a83e58dceed7b9b277070d42a5c85b319cb3ab77a2e2f00c4a7f3deb@<dev.spark.apache.org> cc [~LI,Xiao] > PySpark UDF in case statement is always run > ------------------------------------------- > > Key: SPARK-25060 > URL: https://issues.apache.org/jira/browse/SPARK-25060 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.3.1 > Reporter: Ryan Blue > Priority: Major > > When evaluating a case statement with a python UDF, Spark will always run the > UDF even if the case doesn't use the branch with the UDF call. Here's a repro > case: > {code:lang=python} > from pyspark.sql.types import StringType > def fail_if_x(s): > assert s != 'x' > return s > spark.udf.register("fail_if_x", fail_if_x, StringType()) > df = spark.createDataFrame([(1, 'x'), (2, 'y')], ['id', 'str']) > df.registerTempTable("data") > spark.sql("select id, case when str <> 'x' then fail_if_x(str) else null end > from data").show() > {code} > This produces the following error: > {code} > Caused by: org.apache.spark.api.python.PythonException: Traceback (most > recent call last): > File > "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py", > line 189, in main > process() > File > "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py", > line 184, in process > serializer.dump_stream(func(split_index, iterator), outfile) > File > "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py", > line 104, in <lambda> > func = lambda _, it: map(mapper, it) > File "<string>", line 1, in <lambda> > File > "/mnt3/yarn/usercache/rblue/appcache/application_1533057049763_100912/container_1533057049763_100912_01_000002/pyspark.zip/pyspark/worker.py", > line 71, in <lambda> > return lambda *a: f(*a) > File "<ipython-input-1-91ba29d7e46f>", line 4, in fail_if_x > AssertionError > {code} > This is because Python UDFs are extracted from expressions and run in the > BatchEvalPython node inserted as the child of the expression node: > {code} > == Physical Plan == > CollectLimit 21 > +- *Project [id#0L, CASE WHEN NOT (str#1 = x) THEN pythonUDF0#14 ELSE null > END AS CASE WHEN (NOT (str = x)) THEN fail_if_x(str) ELSE CAST(NULL AS > STRING) END#6] > +- BatchEvalPython [fail_if_x(str#1)], [id#0L, str#1, pythonUDF0#14] > +- Scan ExistingRDD[id#0L,str#1] > {code} > This doesn't affect correctness, but the behavior doesn't match the Scala API > where case can be used to avoid passing data that will cause a UDF to fail > into the UDF. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org