[ https://issues.apache.org/jira/browse/SPARK-19039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15796740#comment-15796740 ]
Joseph K. Bradley commented on SPARK-19039: ------------------------------------------- Whoops, thanks! Posted stack trace > UDF ClosureCleaner bug when UDF, col applied in paste mode in REPL > ------------------------------------------------------------------ > > Key: SPARK-19039 > URL: https://issues.apache.org/jira/browse/SPARK-19039 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.3, 2.0.2, 2.1.0 > Reporter: Joseph K. Bradley > > When I try this: > * Define UDF > * Apply UDF to get Column > * Use Column in a DataFrame > I can find weird behavior in the spark-shell when using paste mode. > To reproduce this, paste this into the spark-shell: > {code} > import org.apache.spark.sql.functions._ > val df = spark.createDataFrame(Seq( > ("hi", 1), > ("there", 2), > ("the", 3), > ("end", 4) > )).toDF("a", "b") > val myNumbers = Set(1,2,3) > val tmpUDF = udf { (n: Int) => myNumbers.contains(n) } > val rowHasMyNumber = tmpUDF($"b") > df.where(rowHasMyNumber).show() > {code} > Stack trace for Spark 2.0 (similar for other versions): > {code} > org.apache.spark.SparkException: Task not serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2057) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) > at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934) > at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581) > at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) > at org.apache.spark.sql.Dataset.show(Dataset.scala:526) > at org.apache.spark.sql.Dataset.show(Dataset.scala:486) > at org.apache.spark.sql.Dataset.show(Dataset.scala:495) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59) > at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61) > at > linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7) > at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6) > Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column > Serialization stack: > - object not serializable (class: org.apache.spark.sql.Column, value: > UDF(b)) > - field (class: > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, name: > rowHasMyNumber, type: class org.apache.spark.sql.Column) > - object (class > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw, > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw@6688375a) > - field (class: > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, > name: $outer, type: class > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw) > - object (class > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw$$anonfun$1, > <function1>) > - field (class: > org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, name: func$2, > type: interface scala.Function1) > - object (class > org.apache.spark.sql.catalyst.expressions.ScalaUDF$$anonfun$2, <function1>) > - field (class: org.apache.spark.sql.catalyst.expressions.ScalaUDF, > name: f, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.ScalaUDF, > UDF(input[1, int, false])) > - element of array (index: 1) > - array (class [Ljava.lang.Object;, size 2) > - field (class: > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, name: > references$1, type: class [Ljava.lang.Object;) > - object (class > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8, <function2>) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2057) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:817) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:816) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:358) > at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:816) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:364) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:115) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:136) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:225) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:308) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39) > at > org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2551) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192) > at > org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935) > at > org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934) > at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2581) > at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2149) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:239) > at org.apache.spark.sql.Dataset.show(Dataset.scala:526) > at org.apache.spark.sql.Dataset.show(Dataset.scala:486) > at org.apache.spark.sql.Dataset.show(Dataset.scala:495) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw$$iw.<init>(<console>:45) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw$$iw.<init>(<console>:57) > at > linef732283eefe649f4877db916c5ad096f25.$read$$iw$$iw.<init>(<console>:59) > at linef732283eefe649f4877db916c5ad096f25.$read$$iw.<init>(<console>:61) > at > linef732283eefe649f4877db916c5ad096f25.$eval$.$print$lzycompute(<console>:7) > at linef732283eefe649f4877db916c5ad096f25.$eval$.$print(<console>:6) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org