[ 
https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235889#comment-15235889
 ] 

Josh Rosen commented on SPARK-14540:
------------------------------------

It looks like there's a few problems here:

First, the ClosureCleaner isn't attempting to clean lambda instances in the 
first place because our isClosure check is a bit too restrictive (SPARK-11630).

Next, we can't directly obtain the bytecode for a lambda class because the 
runtime class is dynamically generated. We also don't seem to be able to 
directly obtain a 
[SerializedLambda|https://docs.oracle.com/javase/8/docs/api/java/lang/invoke/SerializedLambda.html]
 handle. One workaround may be to use reflection to clone the closure instance 
and null out its variables, serialize it, inspect the serialized output to find 
SerializedLambdas, then use getImplMethodKind() to figure out where to find the 
bytecode. This seems extremely complex, though; I wonder if it would just be 
easier to have the underlying Scala compiler generate minimal closures which 
don't over-capture in the first place.

> Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
> ----------------------------------------------------------------
>
>                 Key: SPARK-14540
>                 URL: https://issues.apache.org/jira/browse/SPARK-14540
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Spark Core
>            Reporter: Josh Rosen
>
> Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running 
> ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures:
> {code}
> [info] - toplevel return statements in closures are identified at cleaning 
> time *** FAILED *** (32 milliseconds)
> [info]   Expected exception 
> org.apache.spark.util.ReturnStatementInClosureException to be thrown, but no 
> exception was thrown. (ClosureCleanerSuite.scala:57)
> {code}
> and
> {code}
> [info] - user provided closures are actually cleaned *** FAILED *** (56 
> milliseconds)
> [info]   Expected ReturnStatementInClosureException, but got 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not 
> serializable: java.io.NotSerializableException: java.lang.Object
> [info]        - element of array (index: 0)
> [info]        - array (class "[Ljava.lang.Object;", size: 1)
> [info]        - field (class "java.lang.invoke.SerializedLambda", name: 
> "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]        - object (class "java.lang.invoke.SerializedLambda", 
> SerializedLambda[capturingClass=class 
> org.apache.spark.util.TestUserClosuresActuallyCleaned$, 
> functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I,
>  implementation=invokeStatic 
> org/apache/spark/util/TestUserClosuresActuallyCleaned$.org$apache$spark$util$TestUserClosuresActuallyCleaned$$$anonfun$69:(Ljava/lang/Object;I)I,
>  instantiatedMethodType=(I)I, numCaptured=1])
> [info]        - element of array (index: 0)
> [info]        - array (class "[Ljava.lang.Object;", size: 1)
> [info]        - field (class "java.lang.invoke.SerializedLambda", name: 
> "capturedArgs", type: "class [Ljava.lang.Object;")
> [info]        - object (class "java.lang.invoke.SerializedLambda", 
> SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, 
> functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;,
>  implementation=invokeStatic 
> org/apache/spark/rdd/RDD.org$apache$spark$rdd$RDD$$$anonfun$20$adapted:(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  
> instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;,
>  numCaptured=1])
> [info]        - field (class "org.apache.spark.rdd.MapPartitionsRDD", name: 
> "f", type: "interface scala.Function3")
> [info]        - object (class "org.apache.spark.rdd.MapPartitionsRDD", 
> MapPartitionsRDD[2] at apply at Transformer.scala:22)
> [info]        - field (class "scala.Tuple2", name: "_1", type: "class 
> java.lang.Object")
> [info]        - root object (class "scala.Tuple2", (MapPartitionsRDD[2] at 
> apply at 
> Transformer.scala:22,org.apache.spark.SparkContext$$Lambda$957/431842435@6e803685)).
> [info]   This means the closure provided by user is not actually cleaned. 
> (ClosureCleanerSuite.scala:78)
> {code}
> We'll need to figure out a closure cleaning strategy which works for 2.12 
> lambdas.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to