[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-31399: ---------------------------------- Description: The `ClosureCleaner` only support Scala functions and it uses the following check to catch closures {code} // Check whether a class represents a Scala closure private def isClosure(cls: Class[_]): Boolean = { cls.getName.contains("$anonfun$") } {code} This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala functions become Java lambdas. As an example, the following code works well in Spark 2.4 Spark Shell: {code} scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. import org.apache.spark.sql.functions.lit defined class Foo col: org.apache.spark.sql.Column = 123 df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20 {code} But fails in 3.0 {code} scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.map(RDD.scala:421) ... 39 elided Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: 123) - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) - object (class $iw, $iw@2d87ac2b) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) ... 47 more {code} **Apache Spark 2.4.5 with Scala 2.12** {code} Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 2.4.5 /_/ Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) Type in expressions to have them evaluated. Type :help for more information. scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:393) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.map(RDD.scala:392) ... 45 elided Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: 123) - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) - object (class $iw, $iw@73534675) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$1952/356563238, $Lambda$1952/356563238@6ca95b1e) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400) ... 53 more {code} was: The `ClosureCleaner` only support Scala functions and it uses the following check to catch closures {code} // Check whether a class represents a Scala closure private def isClosure(cls: Class[_]): Boolean = { cls.getName.contains("$anonfun$") } {code} This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala functions become Java lambdas. As an example, the following code works well in Spark 2.4 Spark Shell: {code} scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. import org.apache.spark.sql.functions.lit defined class Foo col: org.apache.spark.sql.Column = 123 df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20 {code} But fails in 3.0 {code} scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.map(RDD.scala:421) ... 39 elided Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: 123) - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) - object (class $iw, $iw@2d87ac2b) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) ... 47 more {code} > closure cleaner is broken in Scala 2.12 > --------------------------------------- > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.0.0 > Reporter: Wenchen Fan > Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} > **Apache Spark 2.4.5 with Scala 2.12** > {code} > Welcome to > ____ __ > / __/__ ___ _____/ /__ > _\ \/ _ \/ _ `/ __/ '_/ > /___/ .__/\_,_/_/ /_/\_\ version 2.4.5 > /_/ > Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2326) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:393) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) > at org.apache.spark.rdd.RDD.map(RDD.scala:392) > ... 45 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@73534675) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$1952/356563238, $Lambda$1952/356563238@6ca95b1e) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:400) > ... 53 more > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org