Dr.appointment this afternoon and WFH tomorrow for another Dr. appointment (EOM)

2016-01-07 Thread Zhan Zhang


-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Dataset throws: Task not serializable

2016-01-07 Thread Michael Armbrust
Were you running in the REPL?

On Thu, Jan 7, 2016 at 10:34 AM, Michael Armbrust 
wrote:

> Thanks for providing a great description.  I've opened
> https://issues.apache.org/jira/browse/SPARK-12696
>
> I'm actually getting a different error (running in notebooks though).
> Something seems wrong either way.
>
>>
>> *P.S* mapping by name with case classes doesn't work if the order of the
>> fields of a case class doesn't match with the order of the DataFrame's
>> schema.
>
>
> We have tests for reordering
> 
>  can
> you provide a smaller reproduction of this problem?
>
> On Wed, Jan 6, 2016 at 10:27 PM, Wail Alkowaileet 
> wrote:
>
>> Hey,
>>
>> I got an error when trying to map a Dataset df.as[CLASS] when I have
>> some nested case classes
>> I'm not sure if it's a bug ... or I did something wrong... or I missed
>> some configuration.
>>
>>
>> I did the following:
>>
>> *input snapshot*
>>
>> {
>>   "count": "string",
>>   "name": [{
>> "addr_no": "string",
>> "dais_id": "string",
>> "display_name": "string",
>> "first_name": "string",
>> "full_name": "string",
>> "last_name": "string",
>> "r_id": "string",
>> "reprint": "string",
>> "role": "string",
>> "seq_no": "string",
>> "suffix": "string",
>> "wos_standard": "string"
>>   }]
>> }
>>
>> *Case classes:*
>>
>> case class listType1(addr_no:String, dais_id:String, display_name:String, 
>> first_name:String, full_name:String, last_name:String, r_id:String, 
>> reprint:String, role:String, seq_no:String, suffix:String, 
>> wos_standard:String)
>> case class DatasetType1(count:String, name:Array[listType1])
>>
>> *Schema:*
>> root
>>  |-- count: string (nullable = true)
>>  |-- name: array (nullable = true)
>>  ||-- element: struct (containsNull = true)
>>  |||-- addr_no: string (nullable = true)
>>  |||-- dais_id: string (nullable = true)
>>  |||-- display_name: string (nullable = true)
>>  |||-- first_name: string (nullable = true)
>>  |||-- full_name: string (nullable = true)
>>  |||-- last_name: string (nullable = true)
>>  |||-- r_id: string (nullable = true)
>>  |||-- reprint: string (nullable = true)
>>  |||-- role: string (nullable = true)
>>  |||-- seq_no: string (nullable = true)
>>  |||-- suffix: string (nullable = true)
>>  |||-- wos_standard: string (nullable = true)
>>
>> *Scala code:*
>>
>> import sqlContext.implicits._
>>
>> val ds = df.as[DatasetType1]
>>
>> //Taking first() works fine
>> println(ds.first().count)
>>
>> //map() then first throws exception
>> println(ds.map(x => x.count).first())
>>
>>
>> *Exception Message:*
>> Exception in thread "main" org.apache.spark.SparkException: Task not
>> serializable
>> at
>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>> at
>> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>> at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
>> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
>> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>> at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
>> at org.apache.spark.sql.Dataset.collect(Dataset.scala:668)
>> at main.main$.testAsterixRDDWithSparkSQL(main.scala:63)
>> at main.main$.main(main.scala:70)
>> at main.main.main(main.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
>> Caused by: java.io.NotSerializableException:
>> scala.reflect.internal.Symbols$PackageClassSymbol
>> Serialization stack:
>> - object not serializable (class:
>> scala.reflect.internal.Symbols$PackageClassSymbol, value: package main)
>> - field (class: scala.reflect.internal.Types$ThisType, name: sym, type:
>> class scala.reflect.internal.Symbols$Symbol)
>> - object (class scala.reflect.internal.Types$UniqueThisType, main.type)
>> - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type:
>> class scala.reflect.internal.Types$Type)
>> - object (class scala.reflect.internal.Types$TypeRef$$anon$6,
>> main.listType1)
>> - field 

Re: Dataset throws: Task not serializable

2016-01-07 Thread Michael Armbrust
Thanks for providing a great description.  I've opened
https://issues.apache.org/jira/browse/SPARK-12696

I'm actually getting a different error (running in notebooks though).
Something seems wrong either way.

>
> *P.S* mapping by name with case classes doesn't work if the order of the
> fields of a case class doesn't match with the order of the DataFrame's
> schema.


We have tests for reordering

can
you provide a smaller reproduction of this problem?

On Wed, Jan 6, 2016 at 10:27 PM, Wail Alkowaileet 
wrote:

> Hey,
>
> I got an error when trying to map a Dataset df.as[CLASS] when I have some
> nested case classes
> I'm not sure if it's a bug ... or I did something wrong... or I missed
> some configuration.
>
>
> I did the following:
>
> *input snapshot*
>
> {
>   "count": "string",
>   "name": [{
> "addr_no": "string",
> "dais_id": "string",
> "display_name": "string",
> "first_name": "string",
> "full_name": "string",
> "last_name": "string",
> "r_id": "string",
> "reprint": "string",
> "role": "string",
> "seq_no": "string",
> "suffix": "string",
> "wos_standard": "string"
>   }]
> }
>
> *Case classes:*
>
> case class listType1(addr_no:String, dais_id:String, display_name:String, 
> first_name:String, full_name:String, last_name:String, r_id:String, 
> reprint:String, role:String, seq_no:String, suffix:String, 
> wos_standard:String)
> case class DatasetType1(count:String, name:Array[listType1])
>
> *Schema:*
> root
>  |-- count: string (nullable = true)
>  |-- name: array (nullable = true)
>  ||-- element: struct (containsNull = true)
>  |||-- addr_no: string (nullable = true)
>  |||-- dais_id: string (nullable = true)
>  |||-- display_name: string (nullable = true)
>  |||-- first_name: string (nullable = true)
>  |||-- full_name: string (nullable = true)
>  |||-- last_name: string (nullable = true)
>  |||-- r_id: string (nullable = true)
>  |||-- reprint: string (nullable = true)
>  |||-- role: string (nullable = true)
>  |||-- seq_no: string (nullable = true)
>  |||-- suffix: string (nullable = true)
>  |||-- wos_standard: string (nullable = true)
>
> *Scala code:*
>
> import sqlContext.implicits._
>
> val ds = df.as[DatasetType1]
>
> //Taking first() works fine
> println(ds.first().count)
>
> //map() then first throws exception
> println(ds.map(x => x.count).first())
>
>
> *Exception Message:*
> Exception in thread "main" org.apache.spark.SparkException: Task not
> serializable
> at
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
> at
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
> at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
> at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
> at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
> at
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
> at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
> at org.apache.spark.sql.Dataset.collect(Dataset.scala:668)
> at main.main$.testAsterixRDDWithSparkSQL(main.scala:63)
> at main.main$.main(main.scala:70)
> at main.main.main(main.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:497)
> at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
> Caused by: java.io.NotSerializableException:
> scala.reflect.internal.Symbols$PackageClassSymbol
> Serialization stack:
> - object not serializable (class:
> scala.reflect.internal.Symbols$PackageClassSymbol, value: package main)
> - field (class: scala.reflect.internal.Types$ThisType, name: sym, type:
> class scala.reflect.internal.Symbols$Symbol)
> - object (class scala.reflect.internal.Types$UniqueThisType, main.type)
> - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type:
> class scala.reflect.internal.Types$Type)
> - object (class scala.reflect.internal.Types$TypeRef$$anon$6,
> main.listType1)
> - field (class:
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2,
> name: elementType$1, type: class scala.reflect.api.Types$TypeApi)
> - object (class
>