[jira] [Commented] (SPARK-31399) Closure cleaner broken in Scala 2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17110888#comment-17110888 ] Apache Spark commented on SPARK-31399: -- User 'rednaxelafx' has created a pull request for this issue: https://github.com/apache/spark/pull/28577 > Closure cleaner broken in Scala 2.12 > > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.5, 3.0.0 >Reporter: Wenchen Fan >Assignee: Kris Mok >Priority: Blocker > Fix For: 3.0.0 > > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} > **Apache Spark 2.4.5 with Scala 2.12** > {code} > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 2.4.5 > /_/ > Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) > at
[jira] [Commented] (SPARK-31399) Closure cleaner broken in Scala 2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17110889#comment-17110889 ] Apache Spark commented on SPARK-31399: -- User 'rednaxelafx' has created a pull request for this issue: https://github.com/apache/spark/pull/28577 > Closure cleaner broken in Scala 2.12 > > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.5, 3.0.0 >Reporter: Wenchen Fan >Assignee: Kris Mok >Priority: Blocker > Fix For: 3.0.0 > > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} > **Apache Spark 2.4.5 with Scala 2.12** > {code} > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 2.4.5 > /_/ > Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) > at
[jira] [Commented] (SPARK-31399) Closure cleaner broken in Scala 2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17100721#comment-17100721 ] Apache Spark commented on SPARK-31399: -- User 'rednaxelafx' has created a pull request for this issue: https://github.com/apache/spark/pull/28463 > Closure cleaner broken in Scala 2.12 > > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.5, 3.0.0 >Reporter: Wenchen Fan >Assignee: Kris Mok >Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} > **Apache Spark 2.4.5 with Scala 2.12** > {code} > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 2.4.5 > /_/ > Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) > at
[jira] [Commented] (SPARK-31399) Closure cleaner broken in Scala 2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096629#comment-17096629 ] Dongjoon Hyun commented on SPARK-31399: --- Thank you so much, [~rednaxelafx]. > Closure cleaner broken in Scala 2.12 > > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.5, 3.0.0 >Reporter: Wenchen Fan >Assignee: Kris Mok >Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} > **Apache Spark 2.4.5 with Scala 2.12** > {code} > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 2.4.5 > /_/ > Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) > at
[jira] [Commented] (SPARK-31399) Closure cleaner broken in Scala 2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17096191#comment-17096191 ] Kris Mok commented on SPARK-31399: -- Hi [~dongjoon], I've been working on a fix of this issue and will send out a WIP PR as soon as possible. I've pretty much done an analysis of the situation in parallel to [~joshrosen]'s analysis above and have arrived at very similar conclusions. The fact is, Scala 2.12+'s indylambda (aka LMF-based closures) does still have an equivalent of an "$outer", just under a different name. Thus the logic inside the `ClosureCleaner` for Scala 2.11 support has to be ported basically verbatim to Scala 2.12+/indylambda. That's exactly what I'm working on right now, and it's the main contents of the WIP PR. A separate issue is that the test coverage of ClosureCleaner in the Spark repo is very insufficient. There needs to be a separate suite, similar to `ReplSuite`, that fires up an actual Scala REPL and trigger ClosureCleaner in it to bridge the gap in test coverage. I will do that as a second step of the PR, and once the new test suite is in, the PR can be considered complete and ready for final review. > Closure cleaner broken in Scala 2.12 > > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.5, 3.0.0 >Reporter: Wenchen Fan >Assignee: Kris Mok >Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at >
[jira] [Commented] (SPARK-31399) Closure cleaner broken in Scala 2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17092836#comment-17092836 ] Dongjoon Hyun commented on SPARK-31399: --- Hi, [~smilegator] and [~rednaxelafx]. Is there any update for this Blocker issue? Thank you for any update in advance! > Closure cleaner broken in Scala 2.12 > > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.5, 3.0.0 >Reporter: Wenchen Fan >Assignee: Kris Mok >Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} > **Apache Spark 2.4.5 with Scala 2.12** > {code} > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 2.4.5 > /_/ > Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) > at
[jira] [Commented] (SPARK-31399) Closure cleaner broken in Scala 2.12
[ https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17084356#comment-17084356 ] Xiao Li commented on SPARK-31399: - [~rednaxelafx] will help this ticket and do more investigation. > Closure cleaner broken in Scala 2.12 > > > Key: SPARK-31399 > URL: https://issues.apache.org/jira/browse/SPARK-31399 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.4.5, 3.0.0 >Reporter: Wenchen Fan >Assignee: Kris Mok >Priority: Blocker > > The `ClosureCleaner` only support Scala functions and it uses the following > check to catch closures > {code} > // Check whether a class represents a Scala closure > private def isClosure(cls: Class[_]): Boolean = { > cls.getName.contains("$anonfun$") > } > {code} > This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala > functions become Java lambdas. > As an example, the following code works well in Spark 2.4 Spark Shell: > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > import org.apache.spark.sql.functions.lit > defined class Foo > col: org.apache.spark.sql.Column = 123 > df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at :20 > {code} > But fails in 3.0 > {code} > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) > at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) > at org.apache.spark.rdd.RDD.map(RDD.scala:421) > ... 39 elided > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > 123) > - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) > - object (class $iw, $iw@2d87ac2b) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 1) > - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, > type: class [Ljava.lang.Object;) > - object (class java.lang.invoke.SerializedLambda, > SerializedLambda[capturingClass=class $iw, > functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, > implementation=invokeStatic > $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, > instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) > - writeReplace data (class: java.lang.invoke.SerializedLambda) > - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) > ... 47 more > {code} > **Apache Spark 2.4.5 with Scala 2.12** > {code} > Welcome to > __ > / __/__ ___ _/ /__ > _\ \/ _ \/ _ `/ __/ '_/ >/___/ .__/\_,_/_/ /_/\_\ version 2.4.5 > /_/ > Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242) > Type in expressions to have them evaluated. > Type :help for more information. > scala> :pa > // Entering paste mode (ctrl-D to finish) > import org.apache.spark.sql.functions.lit > case class Foo(id: String) > val col = lit("123") > val df = sc.range(0,10,1,1).map { _ => Foo("") } > // Exiting paste mode, now interpreting. > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:403) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:393) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) > at