[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17080359#comment-17080359 ]
Josh Rosen commented on SPARK-31399: ------------------------------------ h3. My rough first impression I think the problem is that Spark 3.x isn't performing full cleaning of lambdas: the old cleaning logic (which clones closures and nulls out unreferenced fields) only seems to run for non-lambdas ([source|https://github.com/apache/spark/blob/e42a3945acd614a26c7941a9eed161b500fb4520/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L259-L260]); the lambda cleaner runs a [subset|https://github.com/apache/spark/blame/e42a3945acd614a26c7941a9eed161b500fb4520/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L374-L382] of the old cleaning logic to check for {{return}} statements, skipping the rest of the cleaning steps. h3. Different cases to consider Some closures might contain _entirely_ spurious {{$outer}} references, where in theory we could just omit the entire {{$outer}} object (as opposed to cloning it and nulling a subset of its fields). These situations can happen because the Scala compiler's (escape?) analysis isn't perfect. * In 2.11 these will closures will be serializable because the ClosureCleaner can null the {{$outer}} reference. * In 2.12 these will fail to serialize because we can't perform that type of cleaning. * In the past we discovered some cases where Scala 2.12 would generate _more_ unnecessary {{$outer}} references than 2.11, so a closure which could have been serialized _even without any cleaning_ on 2.11 would require cleaning to serialize on 2.12. SPARK-14540 describes a few examples of this: I think the known cases have been fixed in newer 2.12 versions. * In other cases, however, _both_ 2.11 and 2.12 generate entirely spurious references. These aren't behavior regressions w.r.t capture (both versions capture the same things), but such cases will be broken on 2.12 unless we add full cleaning support for lambdas (or modify improve Scala 2.12's analysis beyond 2.11's so the compiler never generate the unnecessary reference in the first place). In other cases, however, we can't entirely null out all captured references because we need to transitively access some fields or methods in those references. Even if the compiler analysis was perfect, I think these cases will still require cleaning to be serializable. h3. Example of unnecessary $outer capture in both 2.11 and 2.12 Here's an example of a closure which over-captures in both 2.11 and 2.12. Here I'm deliberately using {{sc.emptyRDD}} instead of {{spark.range}} because {{range}} makes it own closure cleaner calls and that clutters up the logs: {code:java} Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191) Type in expressions to have them evaluated. Type :help for more information. scala> :paste // Entering paste mode (ctrl-D to finish) sc.setLogLevel("DEBUG") case class Foo(id: String) val nonSerializableObj = new Object val df = sc.emptyRDD[Int].map { _ => Foo("") } // Exiting paste mode, now interpreting. 20/04/10 00:56:24 DEBUG ClosureCleaner: +++ Cleaning closure <function1> ($line14.$read$$iw$$iw$$anonfun$1) +++ 20/04/10 00:56:24 DEBUG ClosureCleaner: + declared fields: 2 20/04/10 00:56:24 DEBUG ClosureCleaner: public static final long $line14.$read$$iw$$iw$$anonfun$1.serialVersionUID 20/04/10 00:56:24 DEBUG ClosureCleaner: private final $line14.$read$$iw$$iw $line14.$read$$iw$$iw$$anonfun$1.$outer 20/04/10 00:56:24 DEBUG ClosureCleaner: + declared methods: 2 20/04/10 00:56:24 DEBUG ClosureCleaner: public final java.lang.Object $line14.$read$$iw$$iw$$anonfun$1.apply(java.lang.Object) 20/04/10 00:56:24 DEBUG ClosureCleaner: public final $line14.$read$$iw$$iw$Foo $line14.$read$$iw$$iw$$anonfun$1.apply(int) 20/04/10 00:56:24 DEBUG ClosureCleaner: + inner classes: 0 20/04/10 00:56:24 DEBUG ClosureCleaner: + outer classes: 1 20/04/10 00:56:24 DEBUG ClosureCleaner: $line14.$read$$iw$$iw 20/04/10 00:56:24 DEBUG ClosureCleaner: + outer objects: 1 20/04/10 00:56:24 DEBUG ClosureCleaner: $line14.$read$$iw$$iw@163cd4a6 20/04/10 00:56:24 DEBUG ClosureCleaner: + populating accessed fields because this is the starting closure 20/04/10 00:56:24 DEBUG ClosureCleaner: + fields accessed by starting closure: 2 20/04/10 00:56:24 DEBUG ClosureCleaner: (class $line14.$read$$iw$$iw,Set()) 20/04/10 00:56:24 DEBUG ClosureCleaner: (class java.lang.Object,Set()) 20/04/10 00:56:24 DEBUG ClosureCleaner: + outermost object is a REPL line object, so we clone it: (class $line14.$read$$iw$$iw,$line14.$read$$iw$$iw@163cd4a6) 20/04/10 00:56:24 DEBUG ClosureCleaner: + cloning the object $line14.$read$$iw$$iw@163cd4a6 of class $line14.$read$$iw$$iw 20/04/10 00:56:24 DEBUG ClosureCleaner: +++ closure <function1> ($line14.$read$$iw$$iw$$anonfun$1) is now cleaned +++ defined class Foo nonSerializableObj: Object = java.lang.Object@f1d5f3 df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[1] at map at <console>:16 {code} Here we have an outer object of type {{$line14.$read$$iw$$iw}} but the "{{fields accessed by starting closure:"}} log output shows that the closure doesn't actually reference any of the outer object's fields. In 3.0 this fails because the closure cleaning doesn't omit the outer object reference: {code:java} Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191) Type in expressions to have them evaluated. Type :help for more information. scala> :paste // Entering paste mode (ctrl-D to finish) sc.setLogLevel("DEBUG") case class Foo(id: String) val nonSerializableObj = new Object val df = sc.emptyRDD[Int].map { _ => Foo("") } // Exiting paste mode, now interpreting. 20/04/10 01:00:34 DEBUG ClosureCleaner: Cleaning lambda: $anonfun$df$1$adapted 20/04/10 01:00:34 DEBUG ClosureCleaner: +++ Lambda closure ($anonfun$df$1$adapted) is now cleaned +++ org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2379) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.map(RDD.scala:395) ... 41 elided Caused by: java.io.NotSerializableException: java.lang.Object Serialization stack: - object not serializable (class: java.lang.Object, value: java.lang.Object@76c1ede3) - field (class: $iw, name: nonSerializableObj, type: class java.lang.Object) - object (class $iw, $iw@463a0302) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$2029/1917668362, $Lambda$2029/1917668362@60af08b2) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) ... 49 more {code} h3. Example of necessary reference to cleanable $outer object Here's a similar example, except this time the closure needs to reference a subset of the outer object's fields: {code:java} Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.4-SNAPSHOT /_/ Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191) Type in expressions to have them evaluated. Type :help for more information. scala> :paste // Entering paste mode (ctrl-D to finish) sc.setLogLevel("DEBUG") val constant = 1 val nonSerializableObj = new Object val df = sc.emptyRDD[Int].map { _ => constant } // Exiting paste mode, now interpreting. 20/04/10 01:06:30 DEBUG ClosureCleaner: +++ Cleaning closure <function1> ($line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1) +++ 20/04/10 01:06:30 DEBUG ClosureCleaner: + declared fields: 2 20/04/10 01:06:30 DEBUG ClosureCleaner: public static final long $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.serialVersionUID 20/04/10 01:06:30 DEBUG ClosureCleaner: private final $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.$outer 20/04/10 01:06:30 DEBUG ClosureCleaner: + declared methods: 3 20/04/10 01:06:30 DEBUG ClosureCleaner: public final int $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(int) 20/04/10 01:06:30 DEBUG ClosureCleaner: public final java.lang.Object $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(java.lang.Object) 20/04/10 01:06:30 DEBUG ClosureCleaner: public int $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply$mcII$sp(int) 20/04/10 01:06:30 DEBUG ClosureCleaner: + inner classes: 0 20/04/10 01:06:30 DEBUG ClosureCleaner: + outer classes: 1 20/04/10 01:06:30 DEBUG ClosureCleaner: $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw 20/04/10 01:06:30 DEBUG ClosureCleaner: + outer objects: 1 20/04/10 01:06:30 DEBUG ClosureCleaner: $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@7bc239db 20/04/10 01:06:30 DEBUG ClosureCleaner: + populating accessed fields because this is the starting closure 20/04/10 01:06:30 DEBUG ClosureCleaner: + fields accessed by starting closure: 2 20/04/10 01:06:30 DEBUG ClosureCleaner: (class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw,Set(constant)) 20/04/10 01:06:30 DEBUG ClosureCleaner: (class java.lang.Object,Set()) 20/04/10 01:06:30 DEBUG ClosureCleaner: + outermost object is a REPL line object, so we clone it: (class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw,$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@7bc239db) 20/04/10 01:06:30 DEBUG ClosureCleaner: + cloning the object $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@7bc239db of class $line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw 20/04/10 01:06:30 DEBUG ClosureCleaner: +++ closure <function1> ($line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1) is now cleaned +++ constant: Int = 1 nonSerializableObj: Object = java.lang.Object@5dd5422f df: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27 {code} Here Spark has cloned both the closure _and_ the outer object, nulling the cloned outer object's unaccessed fields. As expected, this fails in 2.12: {code:java} Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 3.0.0-SNAPSHOT /_/ Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191) Type in expressions to have them evaluated. Type :help for more information. scala> :paste // Entering paste mode (ctrl-D to finish) sc.setLogLevel("DEBUG") val constant = 1 val nonSerializableObj = new Object val df = sc.emptyRDD[Int].map { _ => constant } // Exiting paste mode, now interpreting. 20/04/10 01:07:49 DEBUG ClosureCleaner: Cleaning lambda: $anonfun$df$1 20/04/10 01:07:49 DEBUG ClosureCleaner: +++ Lambda closure ($anonfun$df$1) is now cleaned +++ org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2379) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.map(RDD.scala:395) ... 47 elided Caused by: java.io.NotSerializableException: java.lang.Object Serialization stack: - object not serializable (class: java.lang.Object, value: java.lang.Object@291fbdc9) - field (class: $iw, name: nonSerializableObj, type: class java.lang.Object) - object (class $iw, $iw@6f3b4c9a) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I, implementation=invokeStatic $anonfun$df$1:(L$iw;I)I, instantiatedMethodType=(I)I, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$1853/1591026569, $Lambda$1853/1591026569@1cf38112) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) ... 55 more {code} h3. Brainstorming on possible fixes and workarounds Some rough ideas on how Spark / Scala could fix this (not all of them good, but listed for completeness / brainstorming): * If the Scala compiler didn't over-capture then more closures would be serializable without any cleaning. However: ** It requires a new Scala release and requires users to upgrade to it. ** It's probably pretty hard to do, especially without impacting compile performance. ** It isn't a complete fix because it only addresses the "completely spurious capture" case, not the "capture but only need a subset of things along the reference chains" cases. ** Even if a closure is serializable without cleaning, unreferenced outer object fields might add a lot of bloat to the closure which slows down deserialization (which is performed on a per-task basis). * If Spark implemented "full" closure cleaning for lambdas then we might be able to achieve near-complete parity. ** I'm unsure of technical feasibility, but this is definitely worth investigating further: this seems like the most user-friendly fix. ** If we do this, we should re-introduce some of the ClosureCleaner suite tests which were removed in [https://github.com/apache/spark/commit/8bc304f97ee693b57f33fa6708eb63e2d641c609] Some workarounds which require users to change their code: * Users could mark unserializable / unused closure fields as {{@transient}}: ** In my examples above, marking the {{nonSerializableObj}} field as {{@transient}} would allow the closure to be serialized without cleaning (and avoids size bloat). * Moving the failures to compile-time: ** This "non-serializable closure" issue is especially painful because it shows up at compile-time, not runtime: if users' code doesn't have full unit / integration test coverage then finding these issues can be an unpleasant game of wack-a-mole of tracking down broken jobs (hopefully in a test / staging environment and not production). ** If we could somehow move these failures to compile time (whether through macros or a compiler plugin) then users would have a much more reasonable porting devloop since we'd be moving further in the direction of "if the code compiles then it'll probably run successfully". It looks like Spores can do this: [https://scalacenter.github.io/spores/java-serialization.html] ** In notebooks and REPLs the compile-time and run-time phases are interleaved, so these approaches wouldn't be of as much help there. > closure cleaner is broken in Spark 3.0 > -------------------------------------- > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Wenchen Fan > Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org