[ https://issues.apache.org/jira/browse/SPARK-19039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852732#comment-16852732 ]
Junichi Koizumi commented on SPARK-19039: ------------------------------------------- It seems to be working fine on my console . {quote} scala> df.where(rowHasMyNumber).show() 19/05/30 18:02:52 INFO SparkContext: Starting job: show at <console>:41 19/05/30 18:02:52 INFO DAGScheduler: Got job 0 (show at <console>:41) with 1 output pa rtitions 19/05/30 18:02:52 INFO DAGScheduler: Final stage: ResultStage 0 (show at <console>:41) 19/05/30 18:02:52 INFO DAGScheduler: Parents of final stage: List() 19/05/30 18:02:52 INFO DAGScheduler: Missing parents: List() 19/05/30 18:02:52 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[10] at show at <console>:41), which has no missing parents 19/05/30 18:02:52 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 11.1 KB, free 11.1 KB) 19/05/30 18:02:52 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 4.9 KB, free 16.0 KB) 19/05/30 18:02:52 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:50755 (size: 4.9 KB, free: 511.1 MB) 19/05/30 18:02:52 INFO SparkContext: Created broadcast 2 from broadcast at DAGSchedule r.scala:1008 19/05/30 18:02:52 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Ma pPartitionsRDD[10] at show at <console>:41) 19/05/30 18:02:52 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 19/05/30 18:02:53 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhos t, partition 0,PROCESS_LOCAL, 2452 bytes) 19/05/30 18:02:53 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 19/05/30 18:02:57 INFO GeneratePredicate: Code generated in 3051.980036 ms 19/05/30 18:02:57 INFO GenerateUnsafeProjection: Code generated in 76.257507 ms 19/05/30 18:02:57 INFO GenerateSafeProjection: Code generated in 78.801312 ms 19/05/30 18:02:57 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1617 bytes result sent to driver 19/05/30 18:02:57 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4249 ms on localhost (1/1) 19/05/30 18:02:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 19/05/30 18:02:57 INFO DAGScheduler: ResultStage 0 (show at <console>:41) finished in 4.857 s 19/05/30 18:02:57 INFO DAGScheduler: Job 0 finished: show at <console>:41, took 5.5939 30 s 19/05/30 18:02:58 INFO SparkContext: Starting job: show at <console>:41 19/05/30 18:02:58 INFO DAGScheduler: Got job 1 (show at <console>:41) with 1 output partitions 19/05/30 18:02:58 INFO DAGScheduler: Final stage: ResultStage 1 (show at <console>:41) 19/05/30 18:02:58 INFO DAGScheduler: Parents of final stage: List() 19/05/30 18:02:58 INFO DAGScheduler: Missing parents: List() 19/05/30 18:02:58 INFO DAGScheduler: Submitting ResultStage 1 (MapPartitionsRDD[10] at show at <console>:41), which has no missing parents 19/05/30 18:02:58 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 11.1 KB, free 27.0 KB) 19/05/30 18:02:58 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 4.9 KB, free 31.9 KB) 19/05/30 18:02:58 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:50755 (size: 4.9 KB, free: 511.1 MB) 19/05/30 18:02:58 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:1008 19/05/30 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[10] at show at <console>:41) 19/05/30 18:02:58 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 19/05/30 18:02:58 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 2451 bytes) 19/05/30 18:02:58 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 19/05/30 18:02:58 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1574 bytes result sent to driver 19/05/30 18:02:58 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 42 ms on localhost (1/1) 19/05/30 18:02:58 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 19/05/30 18:02:58 INFO DAGScheduler: ResultStage 1 (show at <console>:41) finished in 0.044 s 19/05/30 18:02:58 INFO DAGScheduler: Job 1 finished: show at <console>:41, took 0.0870 83 s +------+--+ {quote} |a|b| +------+--+ |hi|1| |there|2| |the|3| +------+--+ > UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL > ------------------------------------------------------------------ > > Key: SPARK-19039 > URL: https://issues.apache.org/jira/browse/SPARK-19039 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.3, 2.0.2, 2.1.0, 2.3.0 > Reporter: Joseph K. Bradley > Priority: Major > > When I try this: > * Define UDF > * Apply UDF to get Column > * Use Column in a DataFrame > I can find weird behavior in the spark-shell when using paste mode. > To reproduce this, paste this into the spark-shell: > {code} > import org.apache.spark.sql.functions._ > val df = spark.createDataFrame(Seq( > ("hi", 1), > ("there", 2), > ("the", 3), > ("end", 4) > )).toDF("a", "b") > val myNumbers = Set(1,2,3) > val tmpUDF = udf { (n: Int) => myNumbers.contains(n) } > val rowHasMyNumber = tmpUDF($"b") > df.where(rowHasMyNumber).show() > {code} > Stack trace for Spark 2.0 (similar for other versions): > {code} > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2057) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) > at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934) > at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581) > at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) > at org.apache.spark.sql.Dataset.show(Dataset.scala:526) > at org.apache.spark.sql.Dataset.show(Dataset.scala:486) > at org.apache.spark.sql.Dataset.show(Dataset.scala:495) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59) > at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61) > at > linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7) > at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6) > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > UDF(b)) > - field (class: > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, name: > rowHasMyNumber, type: class org.apache.spark.sql.Column) > - object (class > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw@6688375a) > - field (class: > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, > name: $outer, type: class > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw) > - object (class > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, > <function1>) > - field (class: > org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, > type: interface scala.Function1) > - object (class > org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>) > - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, > name: f, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, > UDF(input[1, int, false])) > - element of array (index: 1) > - array (class [Ljava.lang.Object;, size 2) > - field (class: > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: > references$1, type: class [Ljava.lang.Object;) > - object (class > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2057) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) > at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934) > at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581) > at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) > at org.apache.spark.sql.Dataset.show(Dataset.scala:526) > at org.apache.spark.sql.Dataset.show(Dataset.scala:486) > at org.apache.spark.sql.Dataset.show(Dataset.scala:495) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59) > at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61) > at > linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7) > at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6) > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org