I think this should be fixed in both master and branch-1.6 now. We'll look at doing 1.6.1 sometime in the near future. Please let me know if you can reproduce any issues there.
On Mon, Jan 11, 2016 at 9:46 AM, Wail Alkowaileet <wael....@gmail.com> wrote: > Hello Michael, > > Sorry for the late replay .. I was crossing the world the last few days. > I actually tried both ... REPEL and SparkApp. The reported exception was > in App. > > Unfortunately the data I have is not for distribution ... sorry about > that. > I saw it has been resolved.. I will try to reproduce the same error with > dummy data. > > Thanks! > > On Thu, Jan 7, 2016 at 2:03 PM, Michael Armbrust <mich...@databricks.com> > wrote: > >> Were you running in the REPL? >> >> On Thu, Jan 7, 2016 at 10:34 AM, Michael Armbrust <mich...@databricks.com >> > wrote: >> >>> Thanks for providing a great description. I've opened >>> https://issues.apache.org/jira/browse/SPARK-12696 >>> >>> I'm actually getting a different error (running in notebooks though). >>> Something seems wrong either way. >>> >>>> >>>> *P.S* mapping by name with case classes doesn't work if the order of >>>> the fields of a case class doesn't match with the order of the DataFrame's >>>> schema. >>> >>> >>> We have tests for reordering >>> <https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala#L97> >>> can >>> you provide a smaller reproduction of this problem? >>> >>> On Wed, Jan 6, 2016 at 10:27 PM, Wail Alkowaileet <wael....@gmail.com> >>> wrote: >>> >>>> Hey, >>>> >>>> I got an error when trying to map a Dataset df.as[CLASS] when I have >>>> some nested case classes >>>> I'm not sure if it's a bug ... or I did something wrong... or I missed >>>> some configuration. >>>> >>>> >>>> I did the following: >>>> >>>> *input snapshot* >>>> >>>> { >>>> "count": "string", >>>> "name": [{ >>>> "addr_no": "string", >>>> "dais_id": "string", >>>> "display_name": "string", >>>> "first_name": "string", >>>> "full_name": "string", >>>> "last_name": "string", >>>> "r_id": "string", >>>> "reprint": "string", >>>> "role": "string", >>>> "seq_no": "string", >>>> "suffix": "string", >>>> "wos_standard": "string" >>>> }] >>>> } >>>> >>>> *Case classes:* >>>> >>>> case class listType1(addr_no:String, dais_id:String, display_name:String, >>>> first_name:String, full_name:String, last_name:String, r_id:String, >>>> reprint:String, role:String, seq_no:String, suffix:String, >>>> wos_standard:String) >>>> case class DatasetType1(count:String, name:Array[listType1]) >>>> >>>> *Schema:* >>>> root >>>> |-- count: string (nullable = true) >>>> |-- name: array (nullable = true) >>>> | |-- element: struct (containsNull = true) >>>> | | |-- addr_no: string (nullable = true) >>>> | | |-- dais_id: string (nullable = true) >>>> | | |-- display_name: string (nullable = true) >>>> | | |-- first_name: string (nullable = true) >>>> | | |-- full_name: string (nullable = true) >>>> | | |-- last_name: string (nullable = true) >>>> | | |-- r_id: string (nullable = true) >>>> | | |-- reprint: string (nullable = true) >>>> | | |-- role: string (nullable = true) >>>> | | |-- seq_no: string (nullable = true) >>>> | | |-- suffix: string (nullable = true) >>>> | | |-- wos_standard: string (nullable = true) >>>> >>>> *Scala code:* >>>> >>>> import sqlContext.implicits._ >>>> >>>> val ds = df.as[DatasetType1] >>>> >>>> //Taking first() works fine >>>> println(ds.first().count) >>>> >>>> //map() then first throws exception >>>> println(ds.map(x => x.count).first()) >>>> >>>> >>>> *Exception Message:* >>>> Exception in thread "main" org.apache.spark.SparkException: Task not >>>> serializable >>>> at >>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) >>>> at >>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) >>>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) >>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857) >>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) >>>> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) >>>> at >>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) >>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) >>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:926) >>>> at org.apache.spark.sql.Dataset.collect(Dataset.scala:668) >>>> at main.main$.testAsterixRDDWithSparkSQL(main.scala:63) >>>> at main.main$.main(main.scala:70) >>>> at main.main.main(main.scala) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:497) >>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) >>>> Caused by: java.io.NotSerializableException: >>>> scala.reflect.internal.Symbols$PackageClassSymbol >>>> Serialization stack: >>>> - object not serializable (class: >>>> scala.reflect.internal.Symbols$PackageClassSymbol, value: package main) >>>> - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: >>>> class scala.reflect.internal.Symbols$Symbol) >>>> - object (class scala.reflect.internal.Types$UniqueThisType, main.type) >>>> - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: >>>> class scala.reflect.internal.Types$Type) >>>> - object (class scala.reflect.internal.Types$TypeRef$$anon$6, >>>> main.listType1) >>>> - field (class: >>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2, >>>> name: elementType$1, type: class scala.reflect.api.Types$TypeApi) >>>> - object (class >>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2, >>>> <function0>) >>>> - field (class: >>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1, >>>> name: $outer, type: class >>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2) >>>> - object (class >>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1, >>>> <function1>) >>>> - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, >>>> name: function, type: interface scala.Function1) >>>> - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, >>>> mapobjects(<function1>,cast(name#1 as >>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true))) >>>> - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: >>>> targetObject, type: class >>>> org.apache.spark.sql.catalyst.expressions.Expression) >>>> - object (class org.apache.spark.sql.catalyst.expressions.Invoke, >>>> invoke(mapobjects(<function1>,cast(name#1 as >>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class >>>> [Lmain.listType1;))) >>>> - writeObject data (class: scala.collection.immutable.$colon$colon) >>>> - object (class scala.collection.immutable.$colon$colon, >>>> List(invoke(count#0,toString,ObjectType(class java.lang.String)), >>>> invoke(mapobjects(<function1>,cast(name#1 as >>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class >>>> [Lmain.listType1;)))) >>>> - field (class: org.apache.spark.sql.catalyst.expressions.NewInstance, >>>> name: arguments, type: interface scala.collection.Seq) >>>> - object (class org.apache.spark.sql.catalyst.expressions.NewInstance, >>>> newinstance(class >>>> main.DatasetType1,invoke(count#0,toString,ObjectType(class >>>> java.lang.String)),invoke(mapobjects(<function1>,cast(name#1 as >>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class >>>> [Lmain.listType1;)),false,ObjectType(class main.DatasetType1),None)) >>>> - field (class: >>>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name: >>>> fromRowExpression, type: class >>>> org.apache.spark.sql.catalyst.expressions.Expression) >>>> - object (class >>>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[count[0]: >>>> string, name#ExprId(4,879d493d-efa1-4799-8fc4-5872ccb3b07b): >>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>]) >>>> - field (class: org.apache.spark.sql.execution.MapPartitions, name: >>>> tEncoder, type: class >>>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) >>>> - object (class org.apache.spark.sql.execution.MapPartitions, >>>> !MapPartitions <function1>, class[count[0]: string, >>>> name#ExprId(4,879d493d-efa1-4799-8fc4-5872ccb3b07b): >>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>], >>>> class[value[0]: string], [value#10] >>>> +- ConvertToSafe >>>> +- Scan JSONRelation[count#0,name#1] InputPaths: >>>> ) >>>> - field (class: >>>> org.apache.spark.sql.execution.MapPartitions$$anonfun$8, name: $outer, >>>> type: class org.apache.spark.sql.execution.MapPartitions) >>>> - object (class >>>> org.apache.spark.sql.execution.MapPartitions$$anonfun$8, <function1>) >>>> - field (class: >>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22, >>>> type: interface scala.Function1) >>>> - object (class >>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, <function0>) >>>> - field (class: >>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, >>>> name: $outer, type: class >>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) >>>> - object (class >>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, >>>> <function3>) >>>> - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: >>>> interface scala.Function3) >>>> - object (class org.apache.spark.rdd.MapPartitionsRDD, >>>> MapPartitionsRDD[6] at collect at main.scala:63) >>>> - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: >>>> class org.apache.spark.rdd.RDD) >>>> - object (class org.apache.spark.OneToOneDependency, >>>> org.apache.spark.OneToOneDependency@7793b55d) >>>> - writeObject data (class: scala.collection.immutable.$colon$colon) >>>> - object (class scala.collection.immutable.$colon$colon, >>>> List(org.apache.spark.OneToOneDependency@7793b55d)) >>>> - field (class: org.apache.spark.rdd.RDD, name: >>>> org$apache$spark$rdd$RDD$$dependencies_, type: interface >>>> scala.collection.Seq) >>>> - object (class org.apache.spark.rdd.MapPartitionsRDD, >>>> MapPartitionsRDD[7] at collect at main.scala:63) >>>> - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: >>>> $outer, type: class org.apache.spark.rdd.RDD) >>>> - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, >>>> <function0>) >>>> - field (class: >>>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: $outer, >>>> type: class org.apache.spark.rdd.RDD$$anonfun$collect$1) >>>> - object (class >>>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, <function1>) >>>> at >>>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40) >>>> at >>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) >>>> at >>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) >>>> at >>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301) >>>> ... 19 more >>>> >>>> >>>> >>>> *P.S* mapping by name with case classes doesn't work if the order of >>>> the fields of a case class doesn't match with the order of the DataFrame's >>>> schema. >>>> >>>> >>>> -- >>>> >>>> *Regards,* >>>> Wail Alkowaileet >>>> >>> >>> >> > > > -- > > *Regards,* > Wail Alkowaileet >