Wenchen Fan created SPARK-31399: ----------------------------------- Summary: closure cleaner is broken in Spark 3.0 Key: SPARK-31399 URL: https://issues.apache.org/jira/browse/SPARK-31399 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 3.0.0 Reporter: Wenchen Fan
The `ClosureCleaner` only support Scala functions and it uses the following check to catch closures {code} // Check whether a class represents a Scala closure private def isClosure(cls: Class[_]): Boolean = { cls.getName.contains("$anonfun$") } {code} This doesn't work in 3.0 anyway as we upgrade to Scala 2.12 and most Scala functions become Java lambdas. As an example, the following code works well in Spark 2.4 Spark Shell: {code} scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. import org.apache.spark.sql.functions.lit defined class Foo col: org.apache.spark.sql.Column = 123 df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20 {code} But fails in 3.0 {code} scala> :pa // Entering paste mode (ctrl-D to finish) import org.apache.spark.sql.functions.lit case class Foo(id: String) val col = lit("123") val df = sc.range(0,10,1,1).map { _ => Foo("") } // Exiting paste mode, now interpreting. org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2371) at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.map(RDD.scala:421) ... 39 elided Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack: - object not serializable (class: org.apache.spark.sql.Column, value: 123) - field (class: $iw, name: col, type: class org.apache.spark.sql.Column) - object (class $iw, $iw@2d87ac2b) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 1) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393) ... 47 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org