[ 
https://issues.apache.org/jira/browse/SPARK-26534?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-26534:
------------------------------
    Priority: Minor  (was: Major)

Yes, the closure cleaner has never been able to be 100% sure it gets all the 
references. It also can't null some references as it would modify other objects 
state (think of references to other objects that are shared by other objects). 
This also partly depends on how Scala chooses to represent it. 

Try Scala 2.12; its implementation of closures uses the lambda metafactory and 
lots of this goes away.

I agree it's weird, but, what do you propose?

> Closure Cleaner Bug
> -------------------
>
>                 Key: SPARK-26534
>                 URL: https://issues.apache.org/jira/browse/SPARK-26534
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.1
>            Reporter: sam
>            Priority: Minor
>
> I've found a strange combination of closures where the closure cleaner 
> doesn't seem to be smart enough to figure out how to remove a reference that 
> is not used. I.e. we get a `org.apache.spark.SparkException: Task not 
> serializable` for a Task that is perfectly serializable.  
>  
> In the example below, the only `val` that is actually needed for the closure 
> of the `map` is `foo`, but it tries to serialise `thingy`.  What is odd is 
> changing this code in a number of subtle ways eliminates the error, which 
> I've tried to highlight using comments inline.
>  
> {code:java}
> import org.apache.spark.sql._
> object Test {
>   val sparkSession: SparkSession =
>     SparkSession.builder.master("local").appName("app").getOrCreate()
>   def apply(): Unit = {
>     import sparkSession.implicits._
>     val landedData: Dataset[String] = 
> sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
>     // thingy has to be in this outer scope to reproduce, if in someFunc, 
> cannot reproduce
>     val thingy: Thingy = new Thingy
>     // If not wrapped in someFunc cannot reproduce
>     val someFunc = () => {
>       // If don't reference this foo inside the closer (e.g. just use 
> identity function) cannot reproduce
>       val foo: String = "foo"
>       thingy.run(block = () => {
>         landedData.map(r => {
>           r + foo
>         })
>         .count()
>       })
>     }
>     someFunc()
>   }
> }
> class Thingy {
>   def run[R](block: () => R): R = {
>     block()
>   }
> }
> {code}
> The full trace if ran in `sbt console`
> {code}
> scala> class Thingy {
>      |   def run[R](block: () => R): R = {
>      |     block()
>      |   }
>      | }
> defined class Thingy
> scala> 
> scala> object Test {
>      |   val sparkSession: SparkSession =
>      |     SparkSession.builder.master("local").appName("app").getOrCreate()
>      | 
>      |   def apply(): Unit = {
>      |     import sparkSession.implicits._
>      | 
>      |     val landedData: Dataset[String] = 
> sparkSession.sparkContext.makeRDD(Seq("foo", "bar")).toDS()
>      | 
>      |     // thingy has to be in this outer scope to reproduce, if in 
> someFunc, cannot reproduce
>      |     val thingy: Thingy = new Thingy
>      | 
>      |     // If not wrapped in someFunc cannot reproduce
>      |     val someFunc = () => {
>      |       // If don't reference this foo inside the closer (e.g. just use 
> identity function) cannot reproduce
>      |       val foo: String = "foo"
>      | 
>      |       thingy.run(block = () => {
>      |         landedData.map(r => {
>      |           r + foo
>      |         })
>      |         .count()
>      |       })
>      |     }
>      | 
>      |     someFunc()
>      | 
>      |   }
>      | }
> defined object Test
> scala> 
> scala> 
> scala> Test.apply()
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> 19/01/07 11:27:19 INFO SparkContext: Running Spark version 2.3.1
> 19/01/07 11:27:20 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> 19/01/07 11:27:20 INFO SparkContext: Submitted application: app
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls to: sams
> 19/01/07 11:27:20 INFO SecurityManager: Changing view acls groups to: 
> 19/01/07 11:27:20 INFO SecurityManager: Changing modify acls groups to: 
> 19/01/07 11:27:20 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users  with view permissions: Set(sams); groups 
> with view permissions: Set(); users  with modify permissions: Set(sams); 
> groups with modify permissions: Set()
> 19/01/07 11:27:20 INFO Utils: Successfully started service 'sparkDriver' on 
> port 54066.
> 19/01/07 11:27:20 INFO SparkEnv: Registering MapOutputTracker
> 19/01/07 11:27:20 INFO SparkEnv: Registering BlockManagerMaster
> 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: Using 
> org.apache.spark.storage.DefaultTopologyMapper for getting topology 
> information
> 19/01/07 11:27:20 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint 
> up
> 19/01/07 11:27:20 INFO DiskBlockManager: Created local directory at 
> /private/var/folders/x9/r21b5ttd1wx8zq9qtckfp411n7085c/T/blockmgr-c35bdd46-4804-427b-a513-ee8778814f88
> 19/01/07 11:27:20 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
> 19/01/07 11:27:20 INFO SparkEnv: Registering OutputCommitCoordinator
> 19/01/07 11:27:20 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 19/01/07 11:27:20 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at 
> http://10.197.196.44:4040
> 19/01/07 11:27:21 INFO Executor: Starting executor ID driver on host localhost
> 19/01/07 11:27:21 INFO Utils: Successfully started service 
> 'org.apache.spark.network.netty.NettyBlockTransferService' on port 54067.
> 19/01/07 11:27:21 INFO NettyBlockTransferService: Server created on 
> 10.197.196.44:54067
> 19/01/07 11:27:21 INFO BlockManager: Using 
> org.apache.spark.storage.RandomBlockReplicationPolicy for block replication 
> policy
> 19/01/07 11:27:21 INFO BlockManagerMaster: Registering BlockManager 
> BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManagerMasterEndpoint: Registering block manager 
> 10.197.196.44:54067 with 366.3 MB RAM, BlockManagerId(driver, 10.197.196.44, 
> 54067, None)
> 19/01/07 11:27:21 INFO BlockManagerMaster: Registered BlockManager 
> BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:21 INFO BlockManager: Initialized BlockManager: 
> BlockManagerId(driver, 10.197.196.44, 54067, None)
> 19/01/07 11:27:23 INFO SharedState: Setting hive.metastore.warehouse.dir 
> ('null') to the value of spark.sql.warehouse.dir 
> ('file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/').
> 19/01/07 11:27:23 INFO SharedState: Warehouse path is 
> 'file:/Users/sams/src/asos-datalake-staging/staging/spark-warehouse/'.
> 19/01/07 11:27:23 INFO StateStoreCoordinatorRef: Registered 
> StateStoreCoordinator endpoint
> 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 257.291388 ms
> 19/01/07 11:27:24 INFO CodeGenerator: Code generated in 33.985273 ms
> org.apache.spark.SparkException: Task not serializable
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:345)
>   at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:335)
>   at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:159)
>   at org.apache.spark.SparkContext.clean(SparkContext.scala:2299)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:844)
>   at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:843)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>   at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
>   at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:843)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:608)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
>   at 
> org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>   at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>   at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>   at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
>   at 
> org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
>   at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
>   at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
>   at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3253)
>   at org.apache.spark.sql.Dataset.count(Dataset.scala:2769)
>   at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply$mcJ$sp(<console>:36)
>   at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
>   at Test$$anonfun$1$$anonfun$apply$mcJ$sp$1.apply(<console>:32)
>   at Thingy.run(<console>:16)
>   at Test$$anonfun$1.apply$mcJ$sp(<console>:32)
>   at Test$.apply(<console>:40)
>   ... 40 elided
> Caused by: java.io.NotSerializableException: Thingy
> Serialization stack:
>       - object not serializable (class: Thingy, value: Thingy@679723a6)
>       - field (class: Test$$anonfun$1, name: thingy$1, type: class Thingy)
>       - object (class Test$$anonfun$1, <function0>)
>       - field (class: Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, name: $outer, 
> type: class Test$$anonfun$1)
>       - object (class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1, <function0>)
>       - field (class: 
> Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, name: 
> $outer, type: class Test$$anonfun$1$$anonfun$apply$mcJ$sp$1)
>       - object (class 
> Test$$anonfun$1$$anonfun$apply$mcJ$sp$1$$anonfun$apply$mcJ$sp$2, <function1>)
>       - element of array (index: 0)
>       - array (class [Ljava.lang.Object;, size 3)
>       - field (class: 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, name: 
> references$1, type: class [Ljava.lang.Object;)
>       - object (class 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10, <function2>)
>   at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>   at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
>   at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>   at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
>   ... 90 more
> scala> 
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to