[ 
https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17080359#comment-17080359
 ] 

Josh Rosen commented on SPARK-31399:
------------------------------------

h3. My rough first impression

I think the problem is that Spark 3.x isn't performing full cleaning of 
lambdas: the old cleaning logic (which clones closures and nulls out 
unreferenced fields) only seems to run for non-lambdas 
([source|https://github.com/apache/spark/blob/e42a3945acd614a26c7941a9eed161b500fb4520/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L259-L260]);
 the lambda cleaner runs a 
[subset|https://github.com/apache/spark/blame/e42a3945acd614a26c7941a9eed161b500fb4520/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala#L374-L382]
 of the old cleaning logic to check for {{return}} statements, skipping the 
rest of the cleaning steps.
h3. Different cases to consider 

Some closures might contain _entirely_ spurious {{$outer}} references, where in 
theory we could just omit the entire {{$outer}} object (as opposed to cloning 
it and nulling a subset of its fields). These situations can happen because the 
Scala compiler's (escape?) analysis isn't perfect.
 * In 2.11 these will closures will be serializable because the ClosureCleaner 
can null the {{$outer}} reference.
 * In 2.12 these will fail to serialize because we can't perform that type of 
cleaning.
 * In the past we discovered some cases where Scala 2.12 would generate _more_ 
unnecessary {{$outer}} references than 2.11, so a closure which could have been 
serialized _even without any cleaning_ on 2.11 would require cleaning to 
serialize on 2.12. SPARK-14540 describes a few examples of this: I think the 
known cases have been fixed in newer 2.12 versions.
 * In other cases, however, _both_ 2.11 and 2.12 generate entirely spurious 
references. These aren't behavior regressions w.r.t capture (both versions 
capture the same things), but such cases will be broken on 2.12 unless we add 
full cleaning support for lambdas (or modify improve Scala 2.12's analysis 
beyond 2.11's so the compiler never generate the unnecessary reference in the 
first place).

In other cases, however, we can't entirely null out all captured references 
because we need to transitively access some fields or methods in those 
references. Even if the compiler analysis was perfect, I think these cases will 
still require cleaning to be serializable.
h3. Example of unnecessary $outer capture in both 2.11 and 2.12

Here's an example of a closure which over-captures in both 2.11 and 2.12. Here 
I'm deliberately using {{sc.emptyRDD}} instead of {{spark.range}} because 
{{range}} makes it own closure cleaner calls and that clutters up the logs:
{code:java}
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4-SNAPSHOT
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

sc.setLogLevel("DEBUG")
case class Foo(id: String)
val nonSerializableObj = new Object
val df = sc.emptyRDD[Int].map { _ => Foo("") }

// Exiting paste mode, now interpreting.

20/04/10 00:56:24 DEBUG ClosureCleaner: +++ Cleaning closure <function1> 
($line14.$read$$iw$$iw$$anonfun$1) +++
20/04/10 00:56:24 DEBUG ClosureCleaner:  + declared fields: 2
20/04/10 00:56:24 DEBUG ClosureCleaner:      public static final long 
$line14.$read$$iw$$iw$$anonfun$1.serialVersionUID
20/04/10 00:56:24 DEBUG ClosureCleaner:      private final 
$line14.$read$$iw$$iw $line14.$read$$iw$$iw$$anonfun$1.$outer
20/04/10 00:56:24 DEBUG ClosureCleaner:  + declared methods: 2
20/04/10 00:56:24 DEBUG ClosureCleaner:      public final java.lang.Object 
$line14.$read$$iw$$iw$$anonfun$1.apply(java.lang.Object)
20/04/10 00:56:24 DEBUG ClosureCleaner:      public final 
$line14.$read$$iw$$iw$Foo $line14.$read$$iw$$iw$$anonfun$1.apply(int)
20/04/10 00:56:24 DEBUG ClosureCleaner:  + inner classes: 0
20/04/10 00:56:24 DEBUG ClosureCleaner:  + outer classes: 1
20/04/10 00:56:24 DEBUG ClosureCleaner:      $line14.$read$$iw$$iw
20/04/10 00:56:24 DEBUG ClosureCleaner:  + outer objects: 1
20/04/10 00:56:24 DEBUG ClosureCleaner:      $line14.$read$$iw$$iw@163cd4a6
20/04/10 00:56:24 DEBUG ClosureCleaner:  + populating accessed fields because 
this is the starting closure
20/04/10 00:56:24 DEBUG ClosureCleaner:  + fields accessed by starting closure: 
2
20/04/10 00:56:24 DEBUG ClosureCleaner:      (class $line14.$read$$iw$$iw,Set())
20/04/10 00:56:24 DEBUG ClosureCleaner:      (class java.lang.Object,Set())
20/04/10 00:56:24 DEBUG ClosureCleaner:  + outermost object is a REPL line 
object, so we clone it: (class 
$line14.$read$$iw$$iw,$line14.$read$$iw$$iw@163cd4a6)
20/04/10 00:56:24 DEBUG ClosureCleaner:  + cloning the object 
$line14.$read$$iw$$iw@163cd4a6 of class $line14.$read$$iw$$iw
20/04/10 00:56:24 DEBUG ClosureCleaner:  +++ closure <function1> 
($line14.$read$$iw$$iw$$anonfun$1) is now cleaned +++
defined class Foo
nonSerializableObj: Object = java.lang.Object@f1d5f3
df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[1] at map at <console>:16
{code}
Here we have an outer object of type {{$line14.$read$$iw$$iw}} but the 
"{{fields accessed by starting closure:"}} log output shows that the closure 
doesn't actually reference any of the outer object's fields.

In 3.0 this fails because the closure cleaning doesn't omit the outer object 
reference:
{code:java}
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0-SNAPSHOT
      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

sc.setLogLevel("DEBUG")
case class Foo(id: String)
val nonSerializableObj = new Object
val df = sc.emptyRDD[Int].map { _ => Foo("") }

// Exiting paste mode, now interpreting.

20/04/10 01:00:34 DEBUG ClosureCleaner: Cleaning lambda: $anonfun$df$1$adapted
20/04/10 01:00:34 DEBUG ClosureCleaner:  +++ Lambda closure 
($anonfun$df$1$adapted) is now cleaned +++
org.apache.spark.SparkException: Task not serializable
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2379)
  at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
  at org.apache.spark.rdd.RDD.map(RDD.scala:395)
  ... 41 elided
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
        - object not serializable (class: java.lang.Object, value: 
java.lang.Object@76c1ede3)
        - field (class: $iw, name: nonSerializableObj, type: class 
java.lang.Object)
        - object (class $iw, $iw@463a0302)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class $iw, 
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
 implementation=invokeStatic 
$anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, 
instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class $Lambda$2029/1917668362, 
$Lambda$2029/1917668362@60af08b2)
  at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
  at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
  ... 49 more
 {code}
h3. Example of necessary reference to cleanable $outer object

Here's a similar example, except this time the closure needs to reference a 
subset of the outer object's fields:
{code:java}
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.4-SNAPSHOT
      /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

sc.setLogLevel("DEBUG")
val constant = 1
val nonSerializableObj = new Object
val df = sc.emptyRDD[Int].map { _ => constant }

// Exiting paste mode, now interpreting.

20/04/10 01:06:30 DEBUG ClosureCleaner: +++ Cleaning closure <function1> 
($line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1) +++
20/04/10 01:06:30 DEBUG ClosureCleaner:  + declared fields: 2
20/04/10 01:06:30 DEBUG ClosureCleaner:      public static final long 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.serialVersionUID
20/04/10 01:06:30 DEBUG ClosureCleaner:      private final 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.$outer
20/04/10 01:06:30 DEBUG ClosureCleaner:  + declared methods: 3
20/04/10 01:06:30 DEBUG ClosureCleaner:      public final int 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(int)
20/04/10 01:06:30 DEBUG ClosureCleaner:      public final java.lang.Object 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(java.lang.Object)
20/04/10 01:06:30 DEBUG ClosureCleaner:      public int 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply$mcII$sp(int)
20/04/10 01:06:30 DEBUG ClosureCleaner:  + inner classes: 0
20/04/10 01:06:30 DEBUG ClosureCleaner:  + outer classes: 1
20/04/10 01:06:30 DEBUG ClosureCleaner:      
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw
20/04/10 01:06:30 DEBUG ClosureCleaner:  + outer objects: 1
20/04/10 01:06:30 DEBUG ClosureCleaner:      
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@7bc239db
20/04/10 01:06:30 DEBUG ClosureCleaner:  + populating accessed fields because 
this is the starting closure
20/04/10 01:06:30 DEBUG ClosureCleaner:  + fields accessed by starting closure: 
2
20/04/10 01:06:30 DEBUG ClosureCleaner:      (class 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw,Set(constant))
20/04/10 01:06:30 DEBUG ClosureCleaner:      (class java.lang.Object,Set())
20/04/10 01:06:30 DEBUG ClosureCleaner:  + outermost object is a REPL line 
object, so we clone it: (class 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw,$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@7bc239db)
20/04/10 01:06:30 DEBUG ClosureCleaner:  + cloning the object 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw@7bc239db of class 
$line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw
20/04/10 01:06:30 DEBUG ClosureCleaner:  +++ closure <function1> 
($line14.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1) is now cleaned +++
constant: Int = 1
nonSerializableObj: Object = java.lang.Object@5dd5422f
df: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:27
{code}
Here Spark has cloned both the closure _and_ the outer object, nulling the 
cloned outer object's unaccessed fields.

As expected, this fails in 2.12:
{code:java}
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.0.0-SNAPSHOT
      /_/

Using Scala version 2.12.10 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_191)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

sc.setLogLevel("DEBUG")
val constant = 1
val nonSerializableObj = new Object
val df = sc.emptyRDD[Int].map { _ => constant }

// Exiting paste mode, now interpreting.

20/04/10 01:07:49 DEBUG ClosureCleaner: Cleaning lambda: $anonfun$df$1
20/04/10 01:07:49 DEBUG ClosureCleaner:  +++ Lambda closure ($anonfun$df$1) is 
now cleaned +++
org.apache.spark.SparkException: Task not serializable
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2379)
  at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:396)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
  at org.apache.spark.rdd.RDD.map(RDD.scala:395)
  ... 47 elided
Caused by: java.io.NotSerializableException: java.lang.Object
Serialization stack:
        - object not serializable (class: java.lang.Object, value: 
java.lang.Object@291fbdc9)
        - field (class: $iw, name: nonSerializableObj, type: class 
java.lang.Object)
        - object (class $iw, $iw@6f3b4c9a)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class $iw, 
functionalInterfaceMethod=scala/runtime/java8/JFunction1$mcII$sp.apply$mcII$sp:(I)I,
 implementation=invokeStatic $anonfun$df$1:(L$iw;I)I, 
instantiatedMethodType=(I)I, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class $Lambda$1853/1591026569, 
$Lambda$1853/1591026569@1cf38112)
  at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
  at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
  ... 55 more
 {code}
h3. Brainstorming on possible fixes and workarounds

Some rough ideas on how Spark / Scala could fix this (not all of them good, but 
listed for completeness / brainstorming):
 * If the Scala compiler didn't over-capture then more closures would be 
serializable without any cleaning. However:
 ** It requires a new Scala release and requires users to upgrade to it.
 ** It's probably pretty hard to do, especially without impacting compile 
performance.
 ** It isn't a complete fix because it only addresses the "completely spurious 
capture" case, not the "capture but only need a subset of things along the 
reference chains" cases.
 ** Even if a closure is serializable without cleaning, unreferenced outer 
object fields might add a lot of bloat to the closure which slows down 
deserialization (which is performed on a per-task basis).
 * If Spark implemented "full" closure cleaning for lambdas then we might be 
able to achieve near-complete parity.
 ** I'm unsure of technical feasibility, but this is definitely worth 
investigating further: this seems like the most user-friendly fix.
 ** If we do this, we should re-introduce some of the ClosureCleaner suite 
tests which were removed in 
[https://github.com/apache/spark/commit/8bc304f97ee693b57f33fa6708eb63e2d641c609]
 

Some workarounds which require users to change their code:
 * Users could mark unserializable / unused closure fields as {{@transient}}:
 ** In my examples above, marking the {{nonSerializableObj}} field as 
{{@transient}} would allow the closure to be serialized without cleaning (and 
avoids size bloat).
 * Moving the failures to compile-time:
 ** This "non-serializable closure" issue is especially painful because it 
shows up at compile-time, not runtime: if users' code doesn't have full unit / 
integration test coverage then finding these issues can be an unpleasant game 
of wack-a-mole of tracking down broken jobs (hopefully in a test / staging 
environment and not production).
 ** If we could somehow move these failures to compile time (whether through 
macros or a compiler plugin) then users would have a much more reasonable 
porting devloop since we'd be moving further in the direction of "if the code 
compiles then it'll probably run successfully". It looks like Spores can do 
this: [https://scalacenter.github.io/spores/java-serialization.html]
 ** In notebooks and REPLs the compile-time and run-time phases are 
interleaved, so these approaches wouldn't be of as much help there.

> closure cleaner is broken in Spark 3.0
> --------------------------------------
>
>                 Key: SPARK-31399
>                 URL: https://issues.apache.org/jira/browse/SPARK-31399
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Wenchen Fan
>            Priority: Blocker
>
> The `ClosureCleaner` only support Scala functions and it uses the following 
> check to catch closures
> {code}
>   // Check whether a class represents a Scala closure
>   private def isClosure(cls: Class[_]): Boolean = {
>     cls.getName.contains("$anonfun$")
>   }
> {code}
> This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala 
> functions become Java lambdas.
> As an example, the following code works well in Spark 2.4 Spark Shell:
> {code}
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> import org.apache.spark.sql.functions.lit
> defined class Foo
> col: org.apache.spark.sql.Column = 123
> df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20
> {code}
> But fails in 3.0
> {code}
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2371)
>   at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>   at org.apache.spark.rdd.RDD.map(RDD.scala:421)
>   ... 39 elided
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
>       - object not serializable (class: org.apache.spark.sql.Column, value: 
> 123)
>       - field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
>       - object (class $iw, $iw@2d87ac2b)
>       - element of array (index: 0)
>       - array (class [Ljava.lang.Object;, size 1)
>       - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
> type: class [Ljava.lang.Object;)
>       - object (class java.lang.invoke.SerializedLambda, 
> SerializedLambda[capturingClass=class $iw, 
> functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
>  implementation=invokeStatic 
> $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, 
> instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
>       - writeReplace data (class: java.lang.invoke.SerializedLambda)
>       - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43)
>   at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
>   at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
>   ... 47 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to