[ https://issues.apache.org/jira/browse/SPARK-14540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15235889#comment-15235889 ]
Josh Rosen commented on SPARK-14540: ------------------------------------ It looks like there's a few problems here: First, the ClosureCleaner isn't attempting to clean lambda instances in the first place because our isClosure check is a bit too restrictive (SPARK-11630). Next, we can't directly obtain the bytecode for a lambda class because the runtime class is dynamically generated. We also don't seem to be able to directly obtain a [SerializedLambda|https://docs.oracle.com/javase/8/docs/api/java/lang/invoke/SerializedLambda.html] handle. One workaround may be to use reflection to clone the closure instance and null out its variables, serialize it, inspect the serialized output to find SerializedLambdas, then use getImplMethodKind() to figure out where to find the bytecode. This seems extremely complex, though; I wonder if it would just be easier to have the underlying Scala compiler generate minimal closures which don't over-capture in the first place. > Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner > ---------------------------------------------------------------- > > Key: SPARK-14540 > URL: https://issues.apache.org/jira/browse/SPARK-14540 > Project: Spark > Issue Type: Sub-task > Components: Spark Core > Reporter: Josh Rosen > > Using https://github.com/JoshRosen/spark/tree/build-for-2.12, I tried running > ClosureCleanerSuite with Scala 2.12 and ran into two bad test failures: > {code} > [info] - toplevel return statements in closures are identified at cleaning > time *** FAILED *** (32 milliseconds) > [info] Expected exception > org.apache.spark.util.ReturnStatementInClosureException to be thrown, but no > exception was thrown. (ClosureCleanerSuite.scala:57) > {code} > and > {code} > [info] - user provided closures are actually cleaned *** FAILED *** (56 > milliseconds) > [info] Expected ReturnStatementInClosureException, but got > org.apache.spark.SparkException: Job aborted due to stage failure: Task not > serializable: java.io.NotSerializableException: java.lang.Object > [info] - element of array (index: 0) > [info] - array (class "[Ljava.lang.Object;", size: 1) > [info] - field (class "java.lang.invoke.SerializedLambda", name: > "capturedArgs", type: "class [Ljava.lang.Object;") > [info] - object (class "java.lang.invoke.SerializedLambda", > SerializedLambda[capturingClass=class > org.apache.spark.util.TestUserClosuresActuallyCleaned$, > functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I, > implementation=invokeStatic > org/apache/spark/util/TestUserClosuresActuallyCleaned$.org$apache$spark$util$TestUserClosuresActuallyCleaned$$$anonfun$69:(Ljava/lang/Object;I)I, > instantiatedMethodType=(I)I, numCaptured=1]) > [info] - element of array (index: 0) > [info] - array (class "[Ljava.lang.Object;", size: 1) > [info] - field (class "java.lang.invoke.SerializedLambda", name: > "capturedArgs", type: "class [Ljava.lang.Object;") > [info] - object (class "java.lang.invoke.SerializedLambda", > SerializedLambda[capturingClass=class org.apache.spark.rdd.RDD, > functionalInterfaceMethod=scala/Function3.apply:(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > org/apache/spark/rdd/RDD.org$apache$spark$rdd$RDD$$$anonfun$20$adapted:(Lscala/Function1;Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, > > instantiatedMethodType=(Lorg/apache/spark/TaskContext;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, > numCaptured=1]) > [info] - field (class "org.apache.spark.rdd.MapPartitionsRDD", name: > "f", type: "interface scala.Function3") > [info] - object (class "org.apache.spark.rdd.MapPartitionsRDD", > MapPartitionsRDD[2] at apply at Transformer.scala:22) > [info] - field (class "scala.Tuple2", name: "_1", type: "class > java.lang.Object") > [info] - root object (class "scala.Tuple2", (MapPartitionsRDD[2] at > apply at > Transformer.scala:22,org.apache.spark.SparkContext$$Lambda$957/431842435@6e803685)). > [info] This means the closure provided by user is not actually cleaned. > (ClosureCleanerSuite.scala:78) > {code} > We'll need to figure out a closure cleaning strategy which works for 2.12 > lambdas. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org