Wenchen Fan created SPARK-31399:
-----------------------------------

             Summary: closure cleaner is broken in Spark 3.0
                 Key: SPARK-31399
                 URL: https://issues.apache.org/jira/browse/SPARK-31399
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.0.0
            Reporter: Wenchen Fan


The `ClosureCleaner` only support Scala functions and it uses the following 
check to catch closures
{code}
  // Check whether a class represents a Scala closure
  private def isClosure(cls: Class[_]): Boolean = {
    cls.getName.contains("$anonfun$")
  }
{code}

This doesn't work in 3.0 anyway as we upgrade to Scala 2.12 and most Scala 
functions become Java lambdas.

As an example, the following code works well in Spark 2.4 Spark Shell:
{code}
scala> :pa
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.functions.lit

case class Foo(id: String)
val col = lit("123")
val df = sc.range(0,10,1,1).map { _ => Foo("") }

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.functions.lit
defined class Foo
col: org.apache.spark.sql.Column = 123
df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20
{code}

But fails in 3.0
{code}
scala> :pa
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.functions.lit

case class Foo(id: String)
val col = lit("123")
val df = sc.range(0,10,1,1).map { _ => Foo("") }

// Exiting paste mode, now interpreting.

org.apache.spark.SparkException: Task not serializable
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2371)
  at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
  at org.apache.spark.rdd.RDD.map(RDD.scala:421)
  ... 39 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
        - object not serializable (class: org.apache.spark.sql.Column, value: 
123)
        - field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
        - object (class $iw, $iw@2d87ac2b)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class $iw, 
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
 implementation=invokeStatic 
$anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, 
instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43)
  at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
  at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
  ... 47 more
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to