[ https://issues.apache.org/jira/browse/SPARK-18589?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-18589: ------------------------------------ Assignee: Davies Liu (was: Apache Spark) > persist() resolves "java.lang.RuntimeException: Invalid PythonUDF > <lambda>(...), requires attributes from more than one child" > ------------------------------------------------------------------------------------------------------------------------------ > > Key: SPARK-18589 > URL: https://issues.apache.org/jira/browse/SPARK-18589 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Affects Versions: 2.0.2, 2.1.0 > Environment: Python 3.5, Java 8 > Reporter: Nicholas Chammas > Assignee: Davies Liu > Priority: Critical > > Smells like another optimizer bug that's similar to SPARK-17100 and > SPARK-18254. I'm seeing this on 2.0.2 and on master at commit > {{fb07bbe575aabe68422fd3a31865101fb7fa1722}}. > I don't have a minimal repro for this yet, but the error I'm seeing is: > {code} > py4j.protocol.Py4JJavaError: An error occurred while calling o247.count. > : java.lang.RuntimeException: Invalid PythonUDF <...>(...), requires > attributes from more than one child. > at scala.sys.package$.error(package.scala:27) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:150) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:149) > at scala.collection.immutable.Stream.foreach(Stream.scala:594) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:149) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:311) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328) > at > org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113) > at > org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93) > at > org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93) > at > scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) > at scala.collection.immutable.List.foldLeft(List.scala:84) > at > org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:93) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83) > at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555) > at org.apache.spark.sql.Dataset.count(Dataset.scala:2226) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:280) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at py4j.GatewayConnection.run(GatewayConnection.java:214) > at java.lang.Thread.run(Thread.java:745) > {code} > The extended plan (cleaned of field names) is as follows: > {code} > == Parsed Logical Plan == > 'Filter NOT ('expected_prediction = 'prediction) > +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, > cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS > expected_prediction] > +- Project [p1, p2, pair_features, rawPrediction, probability, > UDF(rawPrediction) AS prediction] > +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS > probability] > +- Project [p1, p2, pair_features, UDF(pair_features) AS > rawPrediction] > +- Project [p1, p2, <lambda>(p1.person, p2.person) AS > pair_features] > +- Project [struct(...) AS p1, struct(...) AS p2] > +- Project [_blocking_key, ..., ...] > +- Join Inner, (_blocking_key = _blocking_key) > :- SubqueryAlias p1 > : +- Project [..., <lambda>(dataset_name, > primary_key, person) AS _blocking_key] > : +- Project [...] > : +- Project [primary_key, universal_key, > _testing_universal_key, struct(...) AS person] > : +- Project [...] > : +- Project [_testing_universal_key, > primary_key, struct(...) AS person] > : +- LogicalRDD [...] > +- SubqueryAlias p2 > +- Project [..., <lambda>(dataset_name, > primary_key, person) AS _blocking_key] > +- Project [...] > +- Project [primary_key, universal_key, > _testing_universal_key, struct(...) AS person] > +- Project [...] > +- Project [_testing_universal_key, > primary_key, struct(...) AS person] > +- LogicalRDD [...] > == Analyzed Logical Plan == > p1: struct<...>, p2: struct<...>, pair_features: vector, rawPrediction: > vector, probability: vector, prediction: double, expected_prediction: float > Filter NOT (cast(expected_prediction as double) = prediction) > +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, > cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS > expected_prediction] > +- Project [p1, p2, pair_features, rawPrediction, probability, > UDF(rawPrediction) AS prediction] > +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS > probability] > +- Project [p1, p2, pair_features, UDF(pair_features) AS > rawPrediction] > +- Project [p1, p2, <lambda>(p1.person, p2.person) AS > pair_features] > +- Project [struct(...) AS p1, struct(...) AS p2] > +- Project [_blocking_key, ..., ...] > +- Join Inner, (_blocking_key = _blocking_key) > :- SubqueryAlias p1 > : +- Project [..., <lambda>(dataset_name, > primary_key, person) AS _blocking_key] > : +- Project [...] > : +- Project [primary_key, universal_key, > _testing_universal_key, struct(...) AS person] > : +- Project [...] > : +- Project [_testing_universal_key, > primary_key, struct(...) AS person] > : +- LogicalRDD [...] > +- SubqueryAlias p2 > +- Project [..., <lambda>(dataset_name, > primary_key, person) AS _blocking_key] > +- Project [...] > +- Project [primary_key, universal_key, > _testing_universal_key, struct(...) AS person] > +- Project [...] > +- Project [_testing_universal_key, > primary_key, struct(...) AS person] > +- LogicalRDD [...] > == Optimized Logical Plan == > Project [struct(...) AS p1, struct(...) AS p2, <lambda>(struct(...).person, > struct(...).person) AS pair_features, UDF(<lambda>(struct(...).person, > struct(...).person)) AS rawPrediction, UDF(UDF(<lambda>(struct(...).person, > struct(...).person))) AS probability, UDF(UDF(<lambda>(struct(...).person, > struct(...).person))) AS prediction, cast((struct(...)._testing_universal_key > = struct(...)._testing_universal_key) as float) AS expected_prediction] > +- Join Inner, (NOT (cast(cast((struct(...)._testing_universal_key = > struct(...)._testing_universal_key) as float) as double) = > UDF(UDF(<lambda>(struct(...).person, struct(...).person)))) && (_blocking_key > = _blocking_key)) > :- Project [..., <lambda>(dataset_name, primary_key, person) AS > _blocking_key] > : +- Filter isnotnull(<lambda>(dataset_name, primary_key, person)) > : +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 > replicas) > : : +- *Project [primary_key, struct(...) AS person, test_people > AS dataset_name] > : : +- Scan ExistingRDD[...] > +- Project [..., <lambda>(dataset_name, primary_key, person) AS > _blocking_key] > +- Filter isnotnull(<lambda>(dataset_name, primary_key, person)) > +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 > replicas) > : +- *Project [primary_key, struct(...) AS person, test_people > AS dataset_name] > : +- Scan ExistingRDD[...] > == Physical Plan == > java.lang.RuntimeException: Invalid PythonUDF <lambda>(struct(...).person, > struct(...).person), requires attributes from more than one child. > {code} > Note the error at the end when Spark tries to print the physical plan. I've > scrubbed some Project fields from the plan to simplify the display, but if > I've scrubbed anything you think is important let me know. > I can get around this problem by adding a {{persist()}} right before the > operation that fails. The failing operation is a filter. > Any clues on how I can boil this down to a minimal repro? Any clues about > where the problem is? -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org