[ 
https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam updated SPARK-26534:
------------------------
    Description: 
I've found a strange combination of closures where the closure cleaner doesn't 
seem to be smart enough to figure out how to remove a reference that is not 
used. I.e. we get a `org.apache.spark.SparkException: Task not serializable` 
for a Task that is perfectly serializable.  

 

In the example below, the only `val` that is actually needed for the closure of 
the `map` is `foo`, but it tries to serialise `thingy`.  What is odd is 
changing this code in a number of subtle ways eliminates the error, which I've 
tried to highlight using comments inline.

 
{code:java}
import org.apache.spark.sql._

object Test {
  val sparkSession: SparkSession =
    SparkSession.builder.master("local").appName("app").getOrCreate()

  def apply(): Unit = {
    import sparkSession.implicits._

    val landedData: Dataset[String] = 
sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()

    // thingy has to be in this outer scope to reproduce, if in someFunc, 
cannot reproduce
    val thingy: Thingy = new Thingy

    // If not wrapped in someFunc cannot reproduce
    val someFunc = () => {
      // If don't reference this foo inside the closer (e.g. just use identity 
function) cannot reproduce
      val foo: String = "foo"

      thingy.run(block = () => {
        landedData.map(r => {
          r + foo
        })
        .count()
      })
    }

    someFunc()

  }
}

class Thingy {
  def run[R](block: () => R): R = {
    block()
  }
}
{code}

The full trace if ran in `sbt console`

{code}
scala> class Thingy {
     |   def run[R](block: () => R): R = {
     |     block()
     |   }
     | }
defined class Thingy

scala> 

scala> object Test {
     |   val sparkSession: SparkSession =
     |     SparkSession.builder.master("local").appName("app").getOrCreate()
     | 
     |   def apply(): Unit = {
     |     import sparkSession.implicits._
     | 
     |     val landedData: Dataset[String] = 
sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
     | 
     |     // thingy has to be in this outer scope to reproduce, if in 
someFunc, cannot reproduce
     |     val thingy: Thingy = new Thingy
     | 
     |     // If not wrapped in someFunc cannot reproduce
     |     val someFunc = () => {
     |       // If don't reference this foo inside the closer (e.g. just use 
identity function) cannot reproduce
     |       val foo: String = "foo"
     | 
     |       thingy.run(block = () => {
     |         landedData.map(r => {
     |           r + foo
     |         })
     |         .count()
     |       })
     |     }
     | 
     |     someFunc()
     | 
     |   }
     | }
defined object Test

scala> 

scala> 

scala> Test.apply()
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/01/07 11:27:19 INFO SparkContext: Running Spark version 2.3.1
19/01/07 11:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
19/01/07 11:27:20 INFO SparkContext: Submitted application: app
19/01/07 11:27:20 INFO SecurityManager: Changing view acls to: sams
19/01/07 11:27:20 INFO SecurityManager: Changing modify acls to: sams
19/01/07 11:27:20 INFO SecurityManager: Changing view acls groups to: 
19/01/07 11:27:20 INFO SecurityManager: Changing modify acls groups to: 
19/01/07 11:27:20 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users  with view permissions: Set(sams); groups 
with view permissions: Set(); users  with modify permissions: Set(sams); groups 
with modify permissions: Set()
19/01/07 11:27:20 INFO Utils: Successfully started service 'sparkDriver' on 
port 54066.
19/01/07 11:27:20 INFO SparkEnv: Registering MapOutputTracker
19/01/07 11:27:20 INFO SparkEnv: Registering BlockManagerMaster
19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: Using 
org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/01/07 11:27:20 INFO DiskBlockManager: Created local directory at 
/private/var/folders/x9/r21b5ttd1wx8zq9qtckfp411n7085c/T/blockmgr-c35bdd46-4804-427b-a513-ee8778814f88
19/01/07 11:27:20 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
19/01/07 11:27:20 INFO SparkEnv: Registering OutputCommitCoordinator
19/01/07 11:27:20 INFO Utils: Successfully started service 'SparkUI' on port 
4040.
19/01/07 11:27:20 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
http://10.197.196.44:4040
19/01/07 11:27:21 INFO Executor: Starting executor ID driver on host localhost
19/01/07 11:27:21 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 54067.
19/01/07 11:27:21 INFO NettyBlockTransferService: Server created on 
10.197.196.44:54067
19/01/07 11:27:21 INFO BlockManager: Using 
org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
policy
19/01/07 11:27:21 INFO BlockManagerMaster: Registering BlockManager 
BlockManagerId(driver, 10.197.196.44, 54067, None)
19/01/07 11:27:21 INFO BlockManagerMasterEndpoint: Registering block manager 
10.197.196.44:54067 with 366.3 MB RAM, BlockManagerId(driver, 10.197.196.44, 
54067, None)
19/01/07 11:27:21 INFO BlockManagerMaster: Registered BlockManager 
BlockManagerId(driver, 10.197.196.44, 54067, None)
19/01/07 11:27:21 INFO BlockManager: Initialized BlockManager: 
BlockManagerId(driver, 10.197.196.44, 54067, None)
19/01/07 11:27:23 INFO SharedState: Setting hive.metastore.warehouse.dir 
('null') to the value of spark.sql.warehouse.dir 
('file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/').
19/01/07 11:27:23 INFO SharedState: Warehouse path is 
'file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/'.
19/01/07 11:27:23 INFO StateStoreCoordinatorRef: Registered 
StateStoreCoordinator endpoint
19/01/07 11:27:24 INFO CodeGenerator: Code generated in 257.291388 ms
19/01/07 11:27:24 INFO CodeGenerator: Code generated in 33.985273 ms
org.apache.spark.SparkException: Task not serializable
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
  at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:844)
  at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:843)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
  at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:843)
  at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
  at 
org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
  at 
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
  at 
org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
  at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(<console>:36)
  at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
  at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
  at Thingy.run(<console>:16)
  at Test$$anonfun$1.apply$mcJ$sp(<console>:32)
  at Test$.apply(<console>:40)
  ... 40 elided
Caused by: java.io.NotSerializableException: Thingy
Serialization stack:
        - object not serializable (class: Thingy, value: Thingy@679723a6)
        - field (class: Test$$anonfun$1, name: thingy$1, type: class Thingy)
        - object (class Test$$anonfun$1, <function0>)
        - field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, name: $outer, 
type: class Test$$anonfun$1)
        - object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, <function0>)
        - field (class: 
Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, name: $outer, 
type: class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1)
        - object (class 
Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, <function1>)
        - element of array (index: 0)
        - array (class [Ljava.lang.Object;, size 3)
        - field (class: 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: 
references$1, type: class [Ljava.lang.Object;)
        - object (class 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
  at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
  at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
  at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
  at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
  ... 90 more

scala> 
{code}


  was:
I've found a strange combination of closures where the closure cleaner doesn't 
seem to be smart enough to figure out how to remove a reference that is not 
used. I.e. we get a `org.apache.spark.SparkException: Task not serializable` 
for a Task that is perfectly serializable.  

 

In the example below, the only `val` that is actually needed for the closure of 
the `map` is `foo`, but it tries to serialise `thingy`.  What is odd is 
changing this code in a number of subtle ways eliminates the error, which I've 
tried to highlight using comments inline.

 
{code:java}
import org.apache.spark.sql._

object Test {
  val sparkSession: SparkSession =
    SparkSession.builder.master("local").appName("app").getOrCreate()

  def apply(): Unit = {
    import sparkSession.implicits._

    val landedData: Dataset[String] = 
sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()

    // thingy has to be in this outer scope to reproduce, if in someFunc, 
cannot reproduce
    val thingy: Thingy = new Thingy

    // If not wrapped in someFunc cannot reproduce
    val someFunc = () => {
      // If don't reference this foo inside the closer (e.g. just use identity 
function) cannot reproduce
      val foo: String = "foo"

      thingy.run(block = () => {
        landedData.map(r => {
          r + foo
        })
        .count()
      })
    }

    someFunc()

  }
}

class Thingy {
  def run[R](block: () => R): R = {
    block()
  }
}
{code}


> Closure Cleaner Bug
> -------------------
>
>                 Key: SPARK-26534
>                 URL: https://issues.apache.org/jira/browse/SPARK-26534
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.1
>            Reporter: sam
>            Priority: Major
>
> I've found a strange combination of closures where the closure cleaner 
> doesn't seem to be smart enough to figure out how to remove a reference that 
> is not used. I.e. we get a `org.apache.spark.SparkException: Task not 
> serializable` for a Task that is perfectly serializable.  
>  
> In the example below, the only `val` that is actually needed for the closure 
> of the `map` is `foo`, but it tries to serialise `thingy`.  What is odd is 
> changing this code in a number of subtle ways eliminates the error, which 
> I've tried to highlight using comments inline.
>  
> {code:java}
> import org.apache.spark.sql._
> object Test {
>   val sparkSession: SparkSession =
>     SparkSession.builder.master("local").appName("app").getOrCreate()
>   def apply(): Unit = {
>     import sparkSession.implicits._
>     val landedData: Dataset[String] = 
> sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
>     // thingy has to be in this outer scope to reproduce, if in someFunc, 
> cannot reproduce
>     val thingy: Thingy = new Thingy
>     // If not wrapped in someFunc cannot reproduce
>     val someFunc = () => {
>       // If don't reference this foo inside the closer (e.g. just use 
> identity function) cannot reproduce
>       val foo: String = "foo"
>       thingy.run(block = () => {
>         landedData.map(r => {
>           r + foo
>         })
>         .count()
>       })
>     }
>     someFunc()
>   }
> }
> class Thingy {
>   def run[R](block: () => R): R = {
>     block()
>   }
> }
> {code}
> The full trace if ran in `sbt console`
> {code}
> scala> class Thingy {
>      |   def run[R](block: () => R): R = {
>      |     block()
>      |   }
>      | }
> defined class Thingy
> scala> 
> scala> object Test {
>      |   val sparkSession: SparkSession =
>      |     SparkSession.builder.master("local").appName("app").getOrCreate()
>      | 
>      |   def apply(): Unit = {
>      |     import sparkSession.implicits._
>      | 
>      |     val landedData: Dataset[String] = 
> sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
>      | 
>      |     // thingy has to be in this outer scope to reproduce, if in 
> someFunc, cannot reproduce
>      |     val thingy: Thingy = new Thingy
>      | 
>      |     // If not wrapped in someFunc cannot reproduce
>      |     val someFunc = () => {
>      |       // If don't reference this foo inside the closer (e.g. just use 
> identity function) cannot reproduce
>      |       val foo: String = "foo"
>      | 
>      |       thingy.run(block = () => {
>      |         landedData.map(r => {
>      |           r + foo
>      |         })
>      |         .count()
>      |       })
>      |     }
>      | 
>      |     someFunc()
>      | 
>      |   }
>      | }
> defined object Test
> scala> 
> scala> 
> scala> Test.apply()
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 19/01/07 11:27:19 INFO SparkContext: Running Spark version 2.3.1
> 19/01/07 11:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 19/01/07 11:27:20 INFO SparkContext: Submitted application: app
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls groups to: 
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls groups to: 
> 19/01/07 11:27:20 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users  with view permissions: Set(sams); groups 
> with view permissions: Set(); users  with modify permissions: Set(sams); 
> groups with modify permissions: Set()
> 19/01/07 11:27:20 INFO Utils: Successfully started service 'sparkDriver' on 
> port 54066.
> 19/01/07 11:27:20 INFO SparkEnv: Registering MapOutputTracker
> 19/01/07 11:27:20 INFO SparkEnv: Registering BlockManagerMaster
> 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: Using 
> org.apache.spark.storage.DefaultTopologyMapper for getting topology 
> information
> 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint 
> up
> 19/01/07 11:27:20 INFO DiskBlockManager: Created local directory at 
> /private/var/folders/x9/r21b5ttd1wx8zq9qtckfp411n7085c/T/blockmgr-c35bdd46-4804-427b-a513-ee8778814f88
> 19/01/07 11:27:20 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
> 19/01/07 11:27:20 INFO SparkEnv: Registering OutputCommitCoordinator
> 19/01/07 11:27:20 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 19/01/07 11:27:20 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
> http://10.197.196.44:4040
> 19/01/07 11:27:21 INFO Executor: Starting executor ID driver on host localhost
> 19/01/07 11:27:21 INFO Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54067.
> 19/01/07 11:27:21 INFO NettyBlockTransferService: Server created on 
> 10.197.196.44:54067
> 19/01/07 11:27:21 INFO BlockManager: Using 
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
> policy
> 19/01/07 11:27:21 INFO BlockManagerMaster: Registering BlockManager 
> BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManagerMasterEndpoint: Registering block manager 
> 10.197.196.44:54067 with 366.3 MB RAM, BlockManagerId(driver, 10.197.196.44, 
> 54067, None)
> 19/01/07 11:27:21 INFO BlockManagerMaster: Registered BlockManager 
> BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManager: Initialized BlockManager: 
> BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:23 INFO SharedState: Setting hive.metastore.warehouse.dir 
> ('null') to the value of spark.sql.warehouse.dir 
> ('file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/').
> 19/01/07 11:27:23 INFO SharedState: Warehouse path is 
> 'file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/'.
> 19/01/07 11:27:23 INFO StateStoreCoordinatorRef: Registered 
> StateStoreCoordinator endpoint
> 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 257.291388 ms
> 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 33.985273 ms
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:844)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:843)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>   at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:843)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
>   at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
>   at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
>   at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
>   at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
>   at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(<console>:36)
>   at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
>   at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
>   at Thingy.run(<console>:16)
>   at Test$$anonfun$1.apply$mcJ$sp(<console>:32)
>   at Test$.apply(<console>:40)
>   ... 40 elided
> Caused by: java.io.NotSerializableException: Thingy
> Serialization stack:
>       - object not serializable (class: Thingy, value: Thingy@679723a6)
>       - field (class: Test$$anonfun$1, name: thingy$1, type: class Thingy)
>       - object (class Test$$anonfun$1, <function0>)
>       - field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, name: $outer, 
> type: class Test$$anonfun$1)
>       - object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, <function0>)
>       - field (class: 
> Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, name: 
> $outer, type: class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1)
>       - object (class 
> Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, <function1>)
>       - element of array (index: 0)
>       - array (class [Ljava.lang.Object;, size 3)
>       - field (class: 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: 
> references$1, type: class [Ljava.lang.Object;)
>       - object (class 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
>   at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>   at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
>   ... 90 more
> scala> 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to