[ https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
sam updated SPARK-26534: ------------------------ Description: I've found a strange combination of closures where the closure cleaner doesn't seem to be smart enough to figure out how to remove a reference that is not used. I.e. we get a `org.apache.spark.SparkException: Task not serializable` for a Task that is perfectly serializable. In the example below, the only `val` that is actually needed for the closure of the `map` is `foo`, but it tries to serialise `thingy`. What is odd is changing this code in a number of subtle ways eliminates the error, which I've tried to highlight using comments inline. {code:java} import org.apache.spark.sql._ object Test { val sparkSession: SparkSession = SparkSession.builder.master("local").appName("app").getOrCreate() def apply(): Unit = { import sparkSession.implicits._ val landedData: Dataset[String] = sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() // thingy has to be in this outer scope to reproduce, if in someFunc, cannot reproduce val thingy: Thingy = new Thingy // If not wrapped in someFunc cannot reproduce val someFunc = () => { // If don't reference this foo inside the closer (e.g. just use identity function) cannot reproduce val foo: String = "foo" thingy.run(block = () => { landedData.map(r => { r + foo }) .count() }) } someFunc() } } class Thingy { def run[R](block: () => R): R = { block() } } {code} The full trace if ran in `sbt console` {code} scala> class Thingy { | def run[R](block: () => R): R = { | block() | } | } defined class Thingy scala> scala> object Test { | val sparkSession: SparkSession = | SparkSession.builder.master("local").appName("app").getOrCreate() | | def apply(): Unit = { | import sparkSession.implicits._ | | val landedData: Dataset[String] = sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() | | // thingy has to be in this outer scope to reproduce, if in someFunc, cannot reproduce | val thingy: Thingy = new Thingy | | // If not wrapped in someFunc cannot reproduce | val someFunc = () => { | // If don't reference this foo inside the closer (e.g. just use identity function) cannot reproduce | val foo: String = "foo" | | thingy.run(block = () => { | landedData.map(r => { | r + foo | }) | .count() | }) | } | | someFunc() | | } | } defined object Test scala> scala> scala> Test.apply() Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 19/01/07 11:27:19 INFO SparkContext: Running Spark version 2.3.1 19/01/07 11:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/01/07 11:27:20 INFO SparkContext: Submitted application: app 19/01/07 11:27:20 INFO SecurityManager: Changing view acls to: sams 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls to: sams 19/01/07 11:27:20 INFO SecurityManager: Changing view acls groups to: 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls groups to: 19/01/07 11:27:20 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(sams); groups with view permissions: Set(); users with modify permissions: Set(sams); groups with modify permissions: Set() 19/01/07 11:27:20 INFO Utils: Successfully started service 'sparkDriver' on port 54066. 19/01/07 11:27:20 INFO SparkEnv: Registering MapOutputTracker 19/01/07 11:27:20 INFO SparkEnv: Registering BlockManagerMaster 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 19/01/07 11:27:20 INFO DiskBlockManager: Created local directory at /private/var/folders/x9/r21b5ttd1wx8zq9qtckfp411n7085c/T/blockmgr-c35bdd46-4804-427b-a513-ee8778814f88 19/01/07 11:27:20 INFO MemoryStore: MemoryStore started with capacity 366.3 MB 19/01/07 11:27:20 INFO SparkEnv: Registering OutputCommitCoordinator 19/01/07 11:27:20 INFO Utils: Successfully started service 'SparkUI' on port 4040. 19/01/07 11:27:20 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://10.197.196.44:4040 19/01/07 11:27:21 INFO Executor: Starting executor ID driver on host localhost 19/01/07 11:27:21 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54067. 19/01/07 11:27:21 INFO NettyBlockTransferService: Server created on 10.197.196.44:54067 19/01/07 11:27:21 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 19/01/07 11:27:21 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.197.196.44, 54067, None) 19/01/07 11:27:21 INFO BlockManagerMasterEndpoint: Registering block manager 10.197.196.44:54067 with 366.3 MB RAM, BlockManagerId(driver, 10.197.196.44, 54067, None) 19/01/07 11:27:21 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.197.196.44, 54067, None) 19/01/07 11:27:21 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.197.196.44, 54067, None) 19/01/07 11:27:23 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/'). 19/01/07 11:27:23 INFO SharedState: Warehouse path is 'file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/'. 19/01/07 11:27:23 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 257.291388 ms 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 33.985273 ms org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) at org.apache.spark.SparkContext.clean(SparkContext.scala:2299) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:844) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:843) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:843) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770) at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769) at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253) at org.apache.spark.sql.Dataset.count(Dataset.scala:2769) at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(<console>:36) at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32) at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32) at Thingy.run(<console>:16) at Test$$anonfun$1.apply$mcJ$sp(<console>:32) at Test$.apply(<console>:40) ... 40 elided Caused by: java.io.NotSerializableException: Thingy Serialization stack: - object not serializable (class: Thingy, value: Thingy@679723a6) - field (class: Test$$anonfun$1, name: thingy$1, type: class Thingy) - object (class Test$$anonfun$1, <function0>) - field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, name: $outer, type: class Test$$anonfun$1) - object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, <function0>) - field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, name: $outer, type: class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1) - object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, <function1>) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 3) - field (class: org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: references$1, type: class [Ljava.lang.Object;) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342) ... 90 more scala> {code} was: I've found a strange combination of closures where the closure cleaner doesn't seem to be smart enough to figure out how to remove a reference that is not used. I.e. we get a `org.apache.spark.SparkException: Task not serializable` for a Task that is perfectly serializable. In the example below, the only `val` that is actually needed for the closure of the `map` is `foo`, but it tries to serialise `thingy`. What is odd is changing this code in a number of subtle ways eliminates the error, which I've tried to highlight using comments inline. {code:java} import org.apache.spark.sql._ object Test { val sparkSession: SparkSession = SparkSession.builder.master("local").appName("app").getOrCreate() def apply(): Unit = { import sparkSession.implicits._ val landedData: Dataset[String] = sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() // thingy has to be in this outer scope to reproduce, if in someFunc, cannot reproduce val thingy: Thingy = new Thingy // If not wrapped in someFunc cannot reproduce val someFunc = () => { // If don't reference this foo inside the closer (e.g. just use identity function) cannot reproduce val foo: String = "foo" thingy.run(block = () => { landedData.map(r => { r + foo }) .count() }) } someFunc() } } class Thingy { def run[R](block: () => R): R = { block() } } {code} > Closure Cleaner Bug > ------------------- > > Key: SPARK-26534 > URL: https://issues.apache.org/jira/browse/SPARK-26534 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.1 > Reporter: sam > Priority: Major > > I've found a strange combination of closures where the closure cleaner > doesn't seem to be smart enough to figure out how to remove a reference that > is not used. I.e. we get a `org.apache.spark.SparkException: Task not > serializable` for a Task that is perfectly serializable. > > In the example below, the only `val` that is actually needed for the closure > of the `map` is `foo`, but it tries to serialise `thingy`. What is odd is > changing this code in a number of subtle ways eliminates the error, which > I've tried to highlight using comments inline. > > {code:java} > import org.apache.spark.sql._ > object Test { > val sparkSession: SparkSession = > SparkSession.builder.master("local").appName("app").getOrCreate() > def apply(): Unit = { > import sparkSession.implicits._ > val landedData: Dataset[String] = > sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() > // thingy has to be in this outer scope to reproduce, if in someFunc, > cannot reproduce > val thingy: Thingy = new Thingy > // If not wrapped in someFunc cannot reproduce > val someFunc = () => { > // If don't reference this foo inside the closer (e.g. just use > identity function) cannot reproduce > val foo: String = "foo" > thingy.run(block = () => { > landedData.map(r => { > r + foo > }) > .count() > }) > } > someFunc() > } > } > class Thingy { > def run[R](block: () => R): R = { > block() > } > } > {code} > The full trace if ran in `sbt console` > {code} > scala> class Thingy { > | def run[R](block: () => R): R = { > | block() > | } > | } > defined class Thingy > scala> > scala> object Test { > | val sparkSession: SparkSession = > | SparkSession.builder.master("local").appName("app").getOrCreate() > | > | def apply(): Unit = { > | import sparkSession.implicits._ > | > | val landedData: Dataset[String] = > sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS() > | > | // thingy has to be in this outer scope to reproduce, if in > someFunc, cannot reproduce > | val thingy: Thingy = new Thingy > | > | // If not wrapped in someFunc cannot reproduce > | val someFunc = () => { > | // If don't reference this foo inside the closer (e.g. just use > identity function) cannot reproduce > | val foo: String = "foo" > | > | thingy.run(block = () => { > | landedData.map(r => { > | r + foo > | }) > | .count() > | }) > | } > | > | someFunc() > | > | } > | } > defined object Test > scala> > scala> > scala> Test.apply() > Using Spark's default log4j profile: > org/apache/spark/log4j-defaults.properties > 19/01/07 11:27:19 INFO SparkContext: Running Spark version 2.3.1 > 19/01/07 11:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 19/01/07 11:27:20 INFO SparkContext: Submitted application: app > 19/01/07 11:27:20 INFO SecurityManager: Changing view acls to: sams > 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls to: sams > 19/01/07 11:27:20 INFO SecurityManager: Changing view acls groups to: > 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls groups to: > 19/01/07 11:27:20 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(sams); groups > with view permissions: Set(); users with modify permissions: Set(sams); > groups with modify permissions: Set() > 19/01/07 11:27:20 INFO Utils: Successfully started service 'sparkDriver' on > port 54066. > 19/01/07 11:27:20 INFO SparkEnv: Registering MapOutputTracker > 19/01/07 11:27:20 INFO SparkEnv: Registering BlockManagerMaster > 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: Using > org.apache.spark.storage.DefaultTopologyMapper for getting topology > information > 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint > up > 19/01/07 11:27:20 INFO DiskBlockManager: Created local directory at > /private/var/folders/x9/r21b5ttd1wx8zq9qtckfp411n7085c/T/blockmgr-c35bdd46-4804-427b-a513-ee8778814f88 > 19/01/07 11:27:20 INFO MemoryStore: MemoryStore started with capacity 366.3 MB > 19/01/07 11:27:20 INFO SparkEnv: Registering OutputCommitCoordinator > 19/01/07 11:27:20 INFO Utils: Successfully started service 'SparkUI' on port > 4040. > 19/01/07 11:27:20 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at > http://10.197.196.44:4040 > 19/01/07 11:27:21 INFO Executor: Starting executor ID driver on host localhost > 19/01/07 11:27:21 INFO Utils: Successfully started service > 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54067. > 19/01/07 11:27:21 INFO NettyBlockTransferService: Server created on > 10.197.196.44:54067 > 19/01/07 11:27:21 INFO BlockManager: Using > org.apache.spark.storage.RandomBlockReplicationPolicy for block replication > policy > 19/01/07 11:27:21 INFO BlockManagerMaster: Registering BlockManager > BlockManagerId(driver, 10.197.196.44, 54067, None) > 19/01/07 11:27:21 INFO BlockManagerMasterEndpoint: Registering block manager > 10.197.196.44:54067 with 366.3 MB RAM, BlockManagerId(driver, 10.197.196.44, > 54067, None) > 19/01/07 11:27:21 INFO BlockManagerMaster: Registered BlockManager > BlockManagerId(driver, 10.197.196.44, 54067, None) > 19/01/07 11:27:21 INFO BlockManager: Initialized BlockManager: > BlockManagerId(driver, 10.197.196.44, 54067, None) > 19/01/07 11:27:23 INFO SharedState: Setting hive.metastore.warehouse.dir > ('null') to the value of spark.sql.warehouse.dir > ('file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/'). > 19/01/07 11:27:23 INFO SharedState: Warehouse path is > 'file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/'. > 19/01/07 11:27:23 INFO StateStoreCoordinatorRef: Registered > StateStoreCoordinator endpoint > 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 257.291388 ms > 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 33.985273 ms > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2299) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:844) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:843) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) > at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:843) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371) > at > org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247) > at > org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294) > at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770) > at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769) > at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253) > at org.apache.spark.sql.Dataset.count(Dataset.scala:2769) > at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(<console>:36) > at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32) > at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32) > at Thingy.run(<console>:16) > at Test$$anonfun$1.apply$mcJ$sp(<console>:32) > at Test$.apply(<console>:40) > ... 40 elided > Caused by: java.io.NotSerializableException: Thingy > Serialization stack: > - object not serializable (class: Thingy, value: Thingy@679723a6) > - field (class: Test$$anonfun$1, name: thingy$1, type: class Thingy) > - object (class Test$$anonfun$1, <function0>) > - field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, name: $outer, > type: class Test$$anonfun$1) > - object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, <function0>) > - field (class: > Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, name: > $outer, type: class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1) > - object (class > Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, <function1>) > - element of array (index: 0) > - array (class [Ljava.lang.Object;, size 3) > - field (class: > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: > references$1, type: class [Ljava.lang.Object;) > - object (class > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342) > ... 90 more > scala> > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org