[ 
https://issues.apache.org/jira/browse/SPARK-31399?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan updated SPARK-31399:
--------------------------------
    Description: 
The `ClosureCleaner` only support Scala functions and it uses the following 
check to catch closures
{code}
  // Check whether a class represents a Scala closure
  private def isClosure(cls: Class[_]): Boolean = {
    cls.getName.contains("$anonfun$")
  }
{code}

This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala 
functions become Java lambdas.

As an example, the following code works well in Spark 2.4 Spark Shell:
{code}
scala> :pa
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.functions.lit

case class Foo(id: String)
val col = lit("123")
val df = sc.range(0,10,1,1).map { _ => Foo("") }

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.functions.lit
defined class Foo
col: org.apache.spark.sql.Column = 123
df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20
{code}

But fails in 3.0
{code}
scala> :pa
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.functions.lit

case class Foo(id: String)
val col = lit("123")
val df = sc.range(0,10,1,1).map { _ => Foo("") }

// Exiting paste mode, now interpreting.

org.apache.spark.SparkException: Task not serializable
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2371)
  at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
  at org.apache.spark.rdd.RDD.map(RDD.scala:421)
  ... 39 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
        - object not serializable (class: org.apache.spark.sql.Column, value: 
123)
        - field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
        - object (class $iw, $iw@2d87ac2b)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class $iw, 
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
 implementation=invokeStatic 
$anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, 
instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43)
  at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
  at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
  ... 47 more
{code}

  was:
The `ClosureCleaner` only support Scala functions and it uses the following 
check to catch closures
{code}
  // Check whether a class represents a Scala closure
  private def isClosure(cls: Class[_]): Boolean = {
    cls.getName.contains("$anonfun$")
  }
{code}

This doesn't work in 3.0 anyway as we upgrade to Scala 2.12 and most Scala 
functions become Java lambdas.

As an example, the following code works well in Spark 2.4 Spark Shell:
{code}
scala> :pa
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.functions.lit

case class Foo(id: String)
val col = lit("123")
val df = sc.range(0,10,1,1).map { _ => Foo("") }

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.functions.lit
defined class Foo
col: org.apache.spark.sql.Column = 123
df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20
{code}

But fails in 3.0
{code}
scala> :pa
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.functions.lit

case class Foo(id: String)
val col = lit("123")
val df = sc.range(0,10,1,1).map { _ => Foo("") }

// Exiting paste mode, now interpreting.

org.apache.spark.SparkException: Task not serializable
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2371)
  at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
  at org.apache.spark.rdd.RDD.map(RDD.scala:421)
  ... 39 elided
Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
Serialization stack:
        - object not serializable (class: org.apache.spark.sql.Column, value: 
123)
        - field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
        - object (class $iw, $iw@2d87ac2b)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 1)
        - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
type: class [Ljava.lang.Object;)
        - object (class java.lang.invoke.SerializedLambda, 
SerializedLambda[capturingClass=class $iw, 
functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
 implementation=invokeStatic 
$anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, 
instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
        - writeReplace data (class: java.lang.invoke.SerializedLambda)
        - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43)
  at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
  at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
  at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
  ... 47 more
{code}


> closure cleaner is broken in Spark 3.0
> --------------------------------------
>
>                 Key: SPARK-31399
>                 URL: https://issues.apache.org/jira/browse/SPARK-31399
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Wenchen Fan
>            Priority: Major
>
> The `ClosureCleaner` only support Scala functions and it uses the following 
> check to catch closures
> {code}
>   // Check whether a class represents a Scala closure
>   private def isClosure(cls: Class[_]): Boolean = {
>     cls.getName.contains("$anonfun$")
>   }
> {code}
> This doesn't work in 3.0 any more as we upgrade to Scala 2.12 and most Scala 
> functions become Java lambdas.
> As an example, the following code works well in Spark 2.4 Spark Shell:
> {code}
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> import org.apache.spark.sql.functions.lit
> defined class Foo
> col: org.apache.spark.sql.Column = 123
> df: org.apache.spark.rdd.RDD[Foo] = MapPartitionsRDD[5] at map at <pastie>:20
> {code}
> But fails in 3.0
> {code}
> scala> :pa
> // Entering paste mode (ctrl-D to finish)
> import org.apache.spark.sql.functions.lit
> case class Foo(id: String)
> val col = lit("123")
> val df = sc.range(0,10,1,1).map { _ => Foo("") }
> // Exiting paste mode, now interpreting.
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:396)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:386)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2371)
>   at org.apache.spark.rdd.RDD.$anonfun$map$1(RDD.scala:422)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
>   at org.apache.spark.rdd.RDD.map(RDD.scala:421)
>   ... 39 elided
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
>       - object not serializable (class: org.apache.spark.sql.Column, value: 
> 123)
>       - field (class: $iw, name: col, type: class org.apache.spark.sql.Column)
>       - object (class $iw, $iw@2d87ac2b)
>       - element of array (index: 0)
>       - array (class [Ljava.lang.Object;, size 1)
>       - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, 
> type: class [Ljava.lang.Object;)
>       - object (class java.lang.invoke.SerializedLambda, 
> SerializedLambda[capturingClass=class $iw, 
> functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;,
>  implementation=invokeStatic 
> $anonfun$df$1$adapted:(L$iw;Ljava/lang/Object;)LFoo;, 
> instantiatedMethodType=(Ljava/lang/Object;)LFoo;, numCaptured=1])
>       - writeReplace data (class: java.lang.invoke.SerializedLambda)
>       - object (class $Lambda$2438/170049100, $Lambda$2438/170049100@d6b8c43)
>   at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41)
>   at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:393)
>   ... 47 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to