[ https://issues.apache.org/jira/browse/SPARK-12696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust resolved SPARK-12696. -------------------------------------- Resolution: Fixed Fix Version/s: 1.6.1 Issue resolved by pull request 10650 [https://github.com/apache/spark/pull/10650] > Dataset serialization error > --------------------------- > > Key: SPARK-12696 > URL: https://issues.apache.org/jira/browse/SPARK-12696 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.0 > Reporter: Michael Armbrust > Assignee: Michael Armbrust > Priority: Blocker > Fix For: 1.6.1 > > > input snapshot > {code} > { > "count": "string", > "name": [{ > "addr_no": "string", > "dais_id": "string", > "display_name": "string", > "first_name": "string", > "full_name": "string", > "last_name": "string", > "r_id": "string", > "reprint": "string", > "role": "string", > "seq_no": "string", > "suffix": "string", > "wos_standard": "string" > }] > } > {code} > Case classes: > {code} > case class listType1(addr_no:String, dais_id:String, display_name:String, > first_name:String, full_name:String, last_name:String, r_id:String, > reprint:String, role:String, seq_no:String, suffix:String, > wos_standard:String) > case class DatasetType1(count:String, name:Array[listType1]) > {code} > Schema: > {code} > root > |-- count: string (nullable = true) > |-- name: array (nullable = true) > | |-- element: struct (containsNull = true) > | | |-- addr_no: string (nullable = true) > | | |-- dais_id: string (nullable = true) > | | |-- display_name: string (nullable = true) > | | |-- first_name: string (nullable = true) > | | |-- full_name: string (nullable = true) > | | |-- last_name: string (nullable = true) > | | |-- r_id: string (nullable = true) > | | |-- reprint: string (nullable = true) > | | |-- role: string (nullable = true) > | | |-- seq_no: string (nullable = true) > | | |-- suffix: string (nullable = true) > | | |-- wos_standard: string (nullable = true) > {code} > Scala code: > {code} > import sqlContext.implicits._ > val ds = df.as[DatasetType1] > //Taking first() works fine > println(ds.first().count) > //map() then first throws exception > println(ds.map(x => x.count).first()) > {code} > {code} > Exception in thread "main" org.apache.spark.SparkException: Task not > serializable > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) > at > org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) > at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) > at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) > at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) > at org.apache.spark.rdd.RDD.collect(RDD.scala:926) > at org.apache.spark.sql.Dataset.collect(Dataset.scala:668) > at main.main$.testAsterixRDDWithSparkSQL(main.scala:63) > at main.main$.main(main.scala:70) > at main.main.main(main.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > Caused by: java.io.NotSerializableException: > scala.reflect.internal.Symbols$PackageClassSymbol > Serialization stack: > - object not serializable (class: > scala.reflect.internal.Symbols$PackageClassSymbol, value: package main) > - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: > class scala.reflect.internal.Symbols$Symbol) > - object (class scala.reflect.internal.Types$UniqueThisType, main.type) > - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: > class scala.reflect.internal.Types$Type) > - object (class scala.reflect.internal.Types$TypeRef$$anon$6, > main.listType1) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2, > name: elementType$1, type: class scala.reflect.api.Types$TypeApi) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2, > <function0>) > - field (class: > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1, > name: $outer, type: class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2) > - object (class > org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1, > <function1>) > - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, > name: function, type: interface scala.Function1) > - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, > mapobjects(<function1>,cast(name#1 as > array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true))) > - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: > targetObject, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class org.apache.spark.sql.catalyst.expressions.Invoke, > invoke(mapobjects(<function1>,cast(name#1 as > array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class > [Lmain.listType1;))) > - writeObject data (class: scala.collection.immutable.$colon$colon) > - object (class scala.collection.immutable.$colon$colon, > List(invoke(count#0,toString,ObjectType(class java.lang.String)), > invoke(mapobjects(<function1>,cast(name#1 as > array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class > [Lmain.listType1;)))) > - field (class: org.apache.spark.sql.catalyst.expressions.NewInstance, > name: arguments, type: interface scala.collection.Seq) > - object (class org.apache.spark.sql.catalyst.expressions.NewInstance, > newinstance(class main.DatasetType1,invoke(count#0,toString,ObjectType(class > java.lang.String)),invoke(mapobjects(<function1>,cast(name#1 as > array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class > [Lmain.listType1;)),false,ObjectType(class main.DatasetType1),None)) > - field (class: > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name: > fromRowExpression, type: class > org.apache.spark.sql.catalyst.expressions.Expression) > - object (class > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[count[0]: > string, name#ExprId(4,879d493d-efa1-4799-8fc4-5872ccb3b07b): > array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>]) > - field (class: org.apache.spark.sql.execution.MapPartitions, name: > tEncoder, type: class > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) > - object (class org.apache.spark.sql.execution.MapPartitions, > !MapPartitions <function1>, class[count[0]: string, > name#ExprId(4,879d493d-efa1-4799-8fc4-5872ccb3b07b): > array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>], > class[value[0]: string], [value#10] > +- ConvertToSafe > +- Scan JSONRelation[count#0,name#1] InputPaths: > ) > - field (class: > org.apache.spark.sql.execution.MapPartitions$$anonfun$8, name: $outer, type: > class org.apache.spark.sql.execution.MapPartitions) > - object (class > org.apache.spark.sql.execution.MapPartitions$$anonfun$8, <function1>) > - field (class: > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22, type: > interface scala.Function1) > - object (class > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, <function0>) > - field (class: > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, > name: $outer, type: class > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) > - object (class > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, > <function3>) > - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: > interface scala.Function3) > - object (class org.apache.spark.rdd.MapPartitionsRDD, > MapPartitionsRDD[6] at collect at main.scala:63) > - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: > class org.apache.spark.rdd.RDD) > - object (class org.apache.spark.OneToOneDependency, > org.apache.spark.OneToOneDependency@7793b55d) > - writeObject data (class: scala.collection.immutable.$colon$colon) > - object (class scala.collection.immutable.$colon$colon, > List(org.apache.spark.OneToOneDependency@7793b55d)) > - field (class: org.apache.spark.rdd.RDD, name: > org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq) > - object (class org.apache.spark.rdd.MapPartitionsRDD, > MapPartitionsRDD[7] at collect at main.scala:63) > - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: > $outer, type: class org.apache.spark.rdd.RDD) > - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, > <function0>) > - field (class: > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: $outer, type: > class org.apache.spark.rdd.RDD$$anonfun$collect$1) > - object (class > org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, <function1>) > at > org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) > at > org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) > at > org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) > ... 19 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org