[ 
https://issues.apache.org/jira/browse/SPARK-19039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852732#comment-16852732
 ] 

Junichi  Koizumi  commented on SPARK-19039:
-------------------------------------------

It seems to be working fine on my console . 
{quote} scala> df.where(rowHasMyNumber).show()
 19/05/30 18:02:52 INFO SparkContext: Starting job: show at <console>:41
 19/05/30 18:02:52 INFO DAGScheduler: Got job 0 (show at <console>:41) with 1 
output pa rtitions
 19/05/30 18:02:52 INFO DAGScheduler: Final stage: ResultStage 0 (show at 
<console>:41)
 19/05/30 18:02:52 INFO DAGScheduler: Parents of final stage: List()
 19/05/30 18:02:52 INFO DAGScheduler: Missing parents: List()
 19/05/30 18:02:52 INFO DAGScheduler: Submitting ResultStage 0 
(MapPartitionsRDD[10] at show at <console>:41), which has no missing parents
 19/05/30 18:02:52 INFO MemoryStore: Block broadcast_2 stored as values in 
memory (estimated size 11.1 KB, free 11.1 KB)
 19/05/30 18:02:52 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes 
in memory (estimated size 4.9 KB, free 16.0 KB)
 19/05/30 18:02:52 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 
localhost:50755 (size: 4.9 KB, free: 511.1 MB)
 19/05/30 18:02:52 INFO SparkContext: Created broadcast 2 from broadcast at 
DAGSchedule r.scala:1008
 19/05/30 18:02:52 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 0 (Ma pPartitionsRDD[10] at show at <console>:41)
 19/05/30 18:02:52 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
 19/05/30 18:02:53 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
localhos t, partition 0,PROCESS_LOCAL, 2452 bytes)
 19/05/30 18:02:53 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
 19/05/30 18:02:57 INFO GeneratePredicate: Code generated in 3051.980036 ms
 19/05/30 18:02:57 INFO GenerateUnsafeProjection: Code generated in 76.257507 ms
 19/05/30 18:02:57 INFO GenerateSafeProjection: Code generated in 78.801312 ms
 19/05/30 18:02:57 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1617 
bytes result sent to driver
 19/05/30 18:02:57 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
in 4249 ms on localhost (1/1)
 19/05/30 18:02:57 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
have all completed, from pool
 19/05/30 18:02:57 INFO DAGScheduler: ResultStage 0 (show at <console>:41) 
finished in 4.857 s
 19/05/30 18:02:57 INFO DAGScheduler: Job 0 finished: show at <console>:41, 
took 5.5939 30 s
 19/05/30 18:02:58 INFO SparkContext: Starting job: show at <console>:41
 19/05/30 18:02:58 INFO DAGScheduler: Got job 1 (show at <console>:41) with 1 
output partitions
 19/05/30 18:02:58 INFO DAGScheduler: Final stage: ResultStage 1 (show at 
<console>:41)
 19/05/30 18:02:58 INFO DAGScheduler: Parents of final stage: List()
 19/05/30 18:02:58 INFO DAGScheduler: Missing parents: List()
 19/05/30 18:02:58 INFO DAGScheduler: Submitting ResultStage 1 
(MapPartitionsRDD[10] at show at <console>:41), which has no missing parents
 19/05/30 18:02:58 INFO MemoryStore: Block broadcast_3 stored as values in 
memory (estimated size 11.1 KB, free 27.0 KB)
 19/05/30 18:02:58 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes 
in memory (estimated size 4.9 KB, free 31.9 KB)
 19/05/30 18:02:58 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on 
localhost:50755 (size: 4.9 KB, free: 511.1 MB)
 19/05/30 18:02:58 INFO SparkContext: Created broadcast 3 from broadcast at 
DAGScheduler.scala:1008
 19/05/30 18:02:58 INFO DAGScheduler: Submitting 1 missing tasks from 
ResultStage 1 (MapPartitionsRDD[10] at show at <console>:41)
 19/05/30 18:02:58 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
 19/05/30 18:02:58 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, 
localhost, partition 1,PROCESS_LOCAL, 2451 bytes)
 19/05/30 18:02:58 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
 19/05/30 18:02:58 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1574 
bytes result sent to driver
 19/05/30 18:02:58 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) 
in 42 ms on localhost (1/1)
 19/05/30 18:02:58 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks 
have all completed, from pool
 19/05/30 18:02:58 INFO DAGScheduler: ResultStage 1 (show at <console>:41) 
finished in 0.044 s
 19/05/30 18:02:58 INFO DAGScheduler: Job 1 finished: show at <console>:41, 
took 0.0870 83 s
 +------+--+
{quote}
|a|b|

+------+--+
|hi|1|
|there|2|
|the|3|

+------+--+

> UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL
> ------------------------------------------------------------------
>
>                 Key: SPARK-19039
>                 URL: https://issues.apache.org/jira/browse/SPARK-19039
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.3, 2.0.2, 2.1.0, 2.3.0
>            Reporter: Joseph K. Bradley
>            Priority: Major
>
> When I try this:
> * Define UDF
> * Apply UDF to get Column
> * Use Column in a DataFrame
> I can find weird behavior in the spark-shell when using paste mode.
> To reproduce this, paste this into the spark-shell:
> {code}
> import org.apache.spark.sql.functions._
> val df = spark.createDataFrame(Seq(
>   ("hi", 1),
>   ("there", 2),
>   ("the", 3),
>   ("end", 4)
> )).toDF("a", "b")
> val myNumbers = Set(1,2,3)
> val tmpUDF = udf { (n: Int) => myNumbers.contains(n) }
> val rowHasMyNumber = tmpUDF($"b")
> df.where(rowHasMyNumber).show()
> {code}
> Stack trace for Spark 2.0 (similar for other versions):
> {code}
> org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>       at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>       at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
>       at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>       at 
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>       at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
>       at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
>       at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
>       at 
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
>       at 
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
>       at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
>       at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
>       at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
>       at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
>       at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
>       at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
>       at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
>       at 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
>       at 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
>       at 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
>       at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
>       at 
> linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
>       at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
> Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column
> Serialization stack:
>       - object not serializable (class: org.apache.spark.sql.Column, value: 
> UDF(b))
>       - field (class: 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, name: 
> rowHasMyNumber, type: class org.apache.spark.sql.Column)
>       - object (class 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw@6688375a)
>       - field (class: 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, 
> name: $outer, type: class 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw)
>       - object (class 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, 
> <function1>)
>       - field (class: 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, 
> type: interface scala.Function1)
>       - object (class 
> org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>)
>       - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, 
> name: f, type: interface scala.Function1)
>       - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, 
> UDF(input[1, int, false]))
>       - element of array (index: 1)
>       - array (class [Ljava.lang.Object;, size 2)
>       - field (class: 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: 
> references$1, type: class [Ljava.lang.Object;)
>       - object (class 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>)
>       at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2057)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)
>       at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816)
>       at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133)
>       at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114)
>       at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225)
>       at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308)
>       at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
>       at 
> org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
>       at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
>       at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551)
>       at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
>       at 
> org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
>       at 
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
>       at 
> org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
>       at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581)
>       at org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
>       at org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
>       at org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
>       at org.apache.spark.sql.Dataset.show(Dataset.scala:526)
>       at org.apache.spark.sql.Dataset.show(Dataset.scala:486)
>       at org.apache.spark.sql.Dataset.show(Dataset.scala:495)
>       at 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45)
>       at 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57)
>       at 
> linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59)
>       at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61)
>       at 
> linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7)
>       at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to