Abdeali Kothari created SPARK-24458:
---------------------------------------

             Summary: Invalid PythonUDF check_1(), requires attributes from 
more than one child
                 Key: SPARK-24458
                 URL: https://issues.apache.org/jira/browse/SPARK-24458
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.3.0
         Environment: Spark 2.3.0 (local mode)

Mac OSX
            Reporter: Abdeali Kothari


I was trying out a very large query execution plan I have and I got the error:

 
{code:java}
py4j.protocol.Py4JJavaError: An error occurred while calling o359.simpleString.
: java.lang.RuntimeException: Invalid PythonUDF check_1(), requires attributes 
from more than one child.
 at scala.sys.package$.error(package.scala:27)
 at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182)
 at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181)
 at scala.collection.immutable.Stream.foreach(Stream.scala:594)
 at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181)
 at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118)
 at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
 at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
 at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
 at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
 at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
 at 
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
 at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
 at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
 at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114)
 at 
org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94)
 at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
 at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
 at 
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
 at scala.collection.immutable.List.foldLeft(List.scala:84)
 at 
org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
 at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
 at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
 at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
 at 
org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
 at 
org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
 at 
org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:187)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
 at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
 at py4j.Gateway.invoke(Gateway.java:282)
 at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
 at py4j.commands.CallCommand.execute(CallCommand.java:79)
 at py4j.GatewayConnection.run(GatewayConnection.java:214)
 at java.lang.Thread.run(Thread.java:748){code}
I get a dataframe (df) after a lot of PythonUDFs running on a CSV dataset and I 
drop some columns in between. Finally, I create 3 python lists (for example, 
[0.1, 0.2, 0.3, ...] which I convert to a Spark DataFrame using createDataFrame.

I join all three list-converted-dataframes using crossJoin() and then do a 
crossJoin with the original data I have. Then I run a Python UDF which is 
check_1. check_1 is something like:
{code:java}
def check_1():
    if 1 == 1:
        return 'yes'
    else:
        return 'no'{code}
 So, it is a Python UDF which takes in no argument and always returns 'yes'. 
(Note: This UDF is created on the fly... so for testing, I am currently just 
using this dummy always 'yes' function) 

After I get check_1 's output, I am converting all my checks (they could be 
more than 1 but in my current test I have only 1) into a Map(string, string).

Finally, I try to do a filter("checks['first'] = 'yes'") to filter the records 
I need.

When I try to do the filter and then do a .explain() it fails with the above 
error.

 

Here is the explain of the dataframe up until before I do the filter():

 
{noformat}
*(1) Project [... cols ...]
+- BatchEvalPython [python_udf_to_create_map([check_1], 
array(pythonUDF0#1851))], [... cols ...]
+- BatchEvalPython [check_1()], [... cols ...]
+- InMemoryTableScan [... cols ...]
+- InMemoryRelation [... cols ...], true, 10000, StorageLevel(disk, 1 replicas)
+- BroadcastNestedLoopJoin BuildLeft, Cross
:- BroadcastExchange IdentityBroadcastMode
: +- *(5) Project [... cols ...]
: +- BatchEvalPython [... Python UDF ...], [... cols ...]
: +- *(4) Project [... cols ...]
: +- BatchEvalPython [... Python UDFs ...], [... cols ...]
: +- *(3) Project [... cols ...]
: +- BatchEvalPython [... Python UDFs ...], [... cols ...]
: +- *(2) Project [... cols ...]
: +- BatchEvalPython [ ... Python UDFs ... ], [ ... cols ... ]
: +- *(1) FileScan csv [ ... cols ... ] Batched: false, Format: CSV, Location: 
InMemoryFileIndex[file:/Users/abdealijk/Documents/data..., PartitionFilters: 
[], PushedFilters: [], ReadSchema: struct<...
+- CartesianProduct
:- *(6) Project [value#1261 AS computed_v1#1263]
: +- Scan ExistingRDD[value#1261]
+- CartesianProduct
:- *(7) Project [value#1265 AS computed_v2#1267]
: +- Scan ExistingRDD[value#1265]
+- *(8) Project [value#1269 AS computed_v3#1271]
+- Scan ExistingRDD[value#1269]{noformat}
I have simplified the explain() output. Let me know if I have deleted some data 
you may need.

 

 

I tried creating a simpler reproducible example, but wasn't able to make 
anything simpler ....



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to