Hi all, When I tried to convert a Dataset which includes a TimestampType column to a RDD under master branch on spark-shell, I got an error about `org.apache.spark.SparkException: Task not serializable`. How do we convert Dataset includes timestamp to RDD?
Here is the example code and the error: ``` import sqlContext.implicits._ import java.sql.Timestamp import java.text.SimpleDateFormat case class TimestampExample(dt: java.sql.Timestamp) def parse(s: String): Timestamp = { val dateFormat = new SimpleDateFormat("yyyy-MM-dd") new Timestamp(dateFormat.parse(s).getTime()) } val rdd = sc.parallelize(Seq("2015-01-01", "2015-02-01")) val df = rdd.map(x => TimestampExample(parse(x))).toDF() val ds = df.as[TimestampExample] ds.rdd org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2061) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707) at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706) at org.apache.spark.sql.Dataset.rdd(Dataset.scala:166) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:60) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:69) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:71) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:73) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:75) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:91) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:93) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:95) at $iwC$$iwC$$iwC.<init>(<console>:97) at $iwC$$iwC.<init>(<console>:99) at $iwC.<init>(<console>:101) at <init>(<console>:103) at .<init>(<console>:107) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1045) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1326) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:821) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:800) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1064) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.io.NotSerializableException: org.apache.spark.sql.catalyst.util.DateTimeUtils$ Serialization stack: - object not serializable (class: org.apache.spark.sql.catalyst.util.DateTimeUtils$, value: org.apache.spark.sql.catalyst.util.DateTimeUtils$@216c782f) - field (class: org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: staticObject, type: class java.lang.Object) - object (class org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(org.apache.spark.sql.catalyst.util.DateTimeUtils$@216c782f,ObjectType(class java.sql.Timestamp),toJavaTimestamp,input[0, TimestampType],true)) - writeObject data (class: scala.collection.immutable.$colon$colon) - object (class scala.collection.immutable.$colon$colon, List(staticinvoke(org.apache.spark.sql.catalyst.util.DateTimeUtils$@216c782f,ObjectType(class java.sql.Timestamp),toJavaTimestamp,input[0, TimestampType],true))) - field (class: org.apache.spark.sql.catalyst.expressions.NewInstance, name: arguments, type: interface scala.collection.Seq) - object (class org.apache.spark.sql.catalyst.expressions.NewInstance, newinstance(class $iwC$$iwC$TimestampExample,staticinvoke(org.apache.spark.sql.catalyst.util.DateTimeUtils$@216c782f,ObjectType(class java.sql.Timestamp),toJavaTimestamp,input[0, TimestampType],true),false,ObjectType(class $iwC$$iwC$TimestampExample),Some($iwC$$iwC@23b27380))) - field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name: fromRowExpression, type: class org.apache.spark.sql.catalyst.expressions.Expression) - object (class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[dt[0]: timestamp]) - field (class: org.apache.spark.sql.Dataset, name: boundTEncoder, type: class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) - object (class org.apache.spark.sql.Dataset, [dt: timestamp]) - field (class: org.apache.spark.sql.Dataset$$anonfun$rdd$1, name: $outer, type: class org.apache.spark.sql.Dataset) - object (class org.apache.spark.sql.Dataset$$anonfun$rdd$1, <function1>) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) ... 68 more ``` Thanks, Yu ----- -- Yu Ishikawa -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/How-do-we-convert-a-Dataset-includes-timestamp-columns-to-RDD-tp15682.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org