I think this should be fixed in both master and branch-1.6 now.  We'll look
at doing 1.6.1 sometime in the near future.  Please let me know if you can
reproduce any issues there.

On Mon, Jan 11, 2016 at 9:46 AM, Wail Alkowaileet <wael....@gmail.com>
wrote:

> Hello Michael,
>
> Sorry for the late replay .. I was crossing the world the last few days.
> I actually tried both ... REPEL and SparkApp. The reported exception was
> in App.
>
> Unfortunately the data I have is not for distribution ... sorry about
> that.
> I saw it has been resolved.. I will try to reproduce the same error with
> dummy data.
>
> Thanks!
>
> On Thu, Jan 7, 2016 at 2:03 PM, Michael Armbrust <mich...@databricks.com>
> wrote:
>
>> Were you running in the REPL?
>>
>> On Thu, Jan 7, 2016 at 10:34 AM, Michael Armbrust <mich...@databricks.com
>> > wrote:
>>
>>> Thanks for providing a great description.  I've opened
>>> https://issues.apache.org/jira/browse/SPARK-12696
>>>
>>> I'm actually getting a different error (running in notebooks though).
>>> Something seems wrong either way.
>>>
>>>>
>>>> *P.S* mapping by name with case classes doesn't work if the order of
>>>> the fields of a case class doesn't match with the order of the DataFrame's
>>>> schema.
>>>
>>>
>>> We have tests for reordering
>>> <https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala#L97>
>>>  can
>>> you provide a smaller reproduction of this problem?
>>>
>>> On Wed, Jan 6, 2016 at 10:27 PM, Wail Alkowaileet <wael....@gmail.com>
>>> wrote:
>>>
>>>> Hey,
>>>>
>>>> I got an error when trying to map a Dataset df.as[CLASS] when I have
>>>> some nested case classes
>>>> I'm not sure if it's a bug ... or I did something wrong... or I missed
>>>> some configuration.
>>>>
>>>>
>>>> I did the following:
>>>>
>>>> *input snapshot*
>>>>
>>>> {
>>>>   "count": "string",
>>>>   "name": [{
>>>>     "addr_no": "string",
>>>>     "dais_id": "string",
>>>>     "display_name": "string",
>>>>     "first_name": "string",
>>>>     "full_name": "string",
>>>>     "last_name": "string",
>>>>     "r_id": "string",
>>>>     "reprint": "string",
>>>>     "role": "string",
>>>>     "seq_no": "string",
>>>>     "suffix": "string",
>>>>     "wos_standard": "string"
>>>>   }]
>>>> }
>>>>
>>>> *Case classes:*
>>>>
>>>> case class listType1(addr_no:String, dais_id:String, display_name:String, 
>>>> first_name:String, full_name:String, last_name:String, r_id:String, 
>>>> reprint:String, role:String, seq_no:String, suffix:String, 
>>>> wos_standard:String)
>>>> case class DatasetType1(count:String, name:Array[listType1])
>>>>
>>>> *Schema:*
>>>> root
>>>>  |-- count: string (nullable = true)
>>>>  |-- name: array (nullable = true)
>>>>  |    |-- element: struct (containsNull = true)
>>>>  |    |    |-- addr_no: string (nullable = true)
>>>>  |    |    |-- dais_id: string (nullable = true)
>>>>  |    |    |-- display_name: string (nullable = true)
>>>>  |    |    |-- first_name: string (nullable = true)
>>>>  |    |    |-- full_name: string (nullable = true)
>>>>  |    |    |-- last_name: string (nullable = true)
>>>>  |    |    |-- r_id: string (nullable = true)
>>>>  |    |    |-- reprint: string (nullable = true)
>>>>  |    |    |-- role: string (nullable = true)
>>>>  |    |    |-- seq_no: string (nullable = true)
>>>>  |    |    |-- suffix: string (nullable = true)
>>>>  |    |    |-- wos_standard: string (nullable = true)
>>>>
>>>> *Scala code:*
>>>>
>>>> import sqlContext.implicits._
>>>>
>>>> val ds = df.as[DatasetType1]
>>>>
>>>> //Taking first() works fine
>>>> println(ds.first().count)
>>>>
>>>> //map() then first throws exception
>>>> println(ds.map(x => x.count).first())
>>>>
>>>>
>>>> *Exception Message:*
>>>> Exception in thread "main" org.apache.spark.SparkException: Task not
>>>> serializable
>>>> at
>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>>>> at
>>>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>>>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>>>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
>>>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>>>> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>>>> at
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>> at
>>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>>>> at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>>>> at org.apache.spark.sql.Dataset.collect(Dataset.scala:668)
>>>> at main.main$.testAsterixRDDWithSparkSQL(main.scala:63)
>>>> at main.main$.main(main.scala:70)
>>>> at main.main.main(main.scala)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:497)
>>>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>>>> Caused by: java.io.NotSerializableException:
>>>> scala.reflect.internal.Symbols$PackageClassSymbol
>>>> Serialization stack:
>>>> - object not serializable (class:
>>>> scala.reflect.internal.Symbols$PackageClassSymbol, value: package main)
>>>> - field (class: scala.reflect.internal.Types$ThisType, name: sym, type:
>>>> class scala.reflect.internal.Symbols$Symbol)
>>>> - object (class scala.reflect.internal.Types$UniqueThisType, main.type)
>>>> - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type:
>>>> class scala.reflect.internal.Types$Type)
>>>> - object (class scala.reflect.internal.Types$TypeRef$$anon$6,
>>>> main.listType1)
>>>> - field (class:
>>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2,
>>>> name: elementType$1, type: class scala.reflect.api.Types$TypeApi)
>>>> - object (class
>>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2,
>>>> <function0>)
>>>> - field (class:
>>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1,
>>>> name: $outer, type: class
>>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2)
>>>> - object (class
>>>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1,
>>>> <function1>)
>>>> - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects,
>>>> name: function, type: interface scala.Function1)
>>>> - object (class org.apache.spark.sql.catalyst.expressions.MapObjects,
>>>> mapobjects(<function1>,cast(name#1 as
>>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)))
>>>> - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name:
>>>> targetObject, type: class
>>>> org.apache.spark.sql.catalyst.expressions.Expression)
>>>> - object (class org.apache.spark.sql.catalyst.expressions.Invoke,
>>>> invoke(mapobjects(<function1>,cast(name#1 as
>>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class
>>>> [Lmain.listType1;)))
>>>> - writeObject data (class: scala.collection.immutable.$colon$colon)
>>>> - object (class scala.collection.immutable.$colon$colon,
>>>> List(invoke(count#0,toString,ObjectType(class java.lang.String)),
>>>> invoke(mapobjects(<function1>,cast(name#1 as
>>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class
>>>> [Lmain.listType1;))))
>>>> - field (class: org.apache.spark.sql.catalyst.expressions.NewInstance,
>>>> name: arguments, type: interface scala.collection.Seq)
>>>> - object (class org.apache.spark.sql.catalyst.expressions.NewInstance,
>>>> newinstance(class
>>>> main.DatasetType1,invoke(count#0,toString,ObjectType(class
>>>> java.lang.String)),invoke(mapobjects(<function1>,cast(name#1 as
>>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class
>>>> [Lmain.listType1;)),false,ObjectType(class main.DatasetType1),None))
>>>> - field (class:
>>>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name:
>>>> fromRowExpression, type: class
>>>> org.apache.spark.sql.catalyst.expressions.Expression)
>>>> - object (class
>>>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[count[0]:
>>>> string, name#ExprId(4,879d493d-efa1-4799-8fc4-5872ccb3b07b):
>>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>])
>>>> - field (class: org.apache.spark.sql.execution.MapPartitions, name:
>>>> tEncoder, type: class
>>>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder)
>>>> - object (class org.apache.spark.sql.execution.MapPartitions,
>>>> !MapPartitions <function1>, class[count[0]: string,
>>>> name#ExprId(4,879d493d-efa1-4799-8fc4-5872ccb3b07b):
>>>> array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>],
>>>> class[value[0]: string], [value#10]
>>>> +- ConvertToSafe
>>>>    +- Scan JSONRelation[count#0,name#1] InputPaths:
>>>> )
>>>> - field (class:
>>>> org.apache.spark.sql.execution.MapPartitions$$anonfun$8, name: $outer,
>>>> type: class org.apache.spark.sql.execution.MapPartitions)
>>>> - object (class
>>>> org.apache.spark.sql.execution.MapPartitions$$anonfun$8, <function1>)
>>>> - field (class:
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22,
>>>> type: interface scala.Function1)
>>>> - object (class
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, <function0>)
>>>> - field (class:
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
>>>> name: $outer, type: class
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
>>>> - object (class
>>>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
>>>> <function3>)
>>>> - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type:
>>>> interface scala.Function3)
>>>> - object (class org.apache.spark.rdd.MapPartitionsRDD,
>>>> MapPartitionsRDD[6] at collect at main.scala:63)
>>>> - field (class: org.apache.spark.NarrowDependency, name: _rdd, type:
>>>> class org.apache.spark.rdd.RDD)
>>>> - object (class org.apache.spark.OneToOneDependency,
>>>> org.apache.spark.OneToOneDependency@7793b55d)
>>>> - writeObject data (class: scala.collection.immutable.$colon$colon)
>>>> - object (class scala.collection.immutable.$colon$colon,
>>>> List(org.apache.spark.OneToOneDependency@7793b55d))
>>>> - field (class: org.apache.spark.rdd.RDD, name:
>>>> org$apache$spark$rdd$RDD$$dependencies_, type: interface
>>>> scala.collection.Seq)
>>>> - object (class org.apache.spark.rdd.MapPartitionsRDD,
>>>> MapPartitionsRDD[7] at collect at main.scala:63)
>>>> - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name:
>>>> $outer, type: class org.apache.spark.rdd.RDD)
>>>> - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1,
>>>> <function0>)
>>>> - field (class:
>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: $outer,
>>>> type: class org.apache.spark.rdd.RDD$$anonfun$collect$1)
>>>> - object (class
>>>> org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, <function1>)
>>>> at
>>>> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>>>> at
>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>>>> at
>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
>>>> at
>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>>>> ... 19 more
>>>>
>>>>
>>>>
>>>> *P.S* mapping by name with case classes doesn't work if the order of
>>>> the fields of a case class doesn't match with the order of the DataFrame's
>>>> schema.
>>>>
>>>>
>>>> --
>>>>
>>>> *Regards,*
>>>> Wail Alkowaileet
>>>>
>>>
>>>
>>
>
>
> --
>
> *Regards,*
> Wail Alkowaileet
>

Reply via email to