Michael Armbrust created SPARK-12696:
----------------------------------------

             Summary: Dataset serialization error
                 Key: SPARK-12696
                 URL: https://issues.apache.org/jira/browse/SPARK-12696
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 1.6.0
            Reporter: Michael Armbrust
            Assignee: Michael Armbrust


input snapshot
{code}
{
  "count": "string",
  "name": [{
    "addr_no": "string",
    "dais_id": "string",
    "display_name": "string",
    "first_name": "string",
    "full_name": "string",
    "last_name": "string",
    "r_id": "string",
    "reprint": "string",
    "role": "string",
    "seq_no": "string",
    "suffix": "string",
    "wos_standard": "string"
  }]
}
{code}

Case classes:
{code}
case class listType1(addr_no:String, dais_id:String, display_name:String, 
first_name:String, full_name:String, last_name:String, r_id:String, 
reprint:String, role:String, seq_no:String, suffix:String, wos_standard:String)
case class DatasetType1(count:String, name:Array[listType1])
{code}

Schema:
{code}
root
 |-- count: string (nullable = true)
 |-- name: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- addr_no: string (nullable = true)
 |    |    |-- dais_id: string (nullable = true)
 |    |    |-- display_name: string (nullable = true)
 |    |    |-- first_name: string (nullable = true)
 |    |    |-- full_name: string (nullable = true)
 |    |    |-- last_name: string (nullable = true)
 |    |    |-- r_id: string (nullable = true)
 |    |    |-- reprint: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |    |    |-- seq_no: string (nullable = true)
 |    |    |-- suffix: string (nullable = true)
 |    |    |-- wos_standard: string (nullable = true)
{code}

Scala code:
{code}
import sqlContext.implicits._
val ds = df.as[DatasetType1]

//Taking first() works fine
println(ds.first().count)

//map() then first throws exception
println(ds.map(x => x.count).first())
{code}

{code}
Exception in thread "main" org.apache.spark.SparkException: Task not 
serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at 
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1857)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
        at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
        at org.apache.spark.rdd.RDD.collect(RDD.scala:926)
        at org.apache.spark.sql.Dataset.collect(Dataset.scala:668)
        at main.main$.testAsterixRDDWithSparkSQL(main.scala:63)
        at main.main$.main(main.scala:70)
        at main.main.main(main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
Caused by: java.io.NotSerializableException: 
scala.reflect.internal.Symbols$PackageClassSymbol
Serialization stack:
        - object not serializable (class: 
scala.reflect.internal.Symbols$PackageClassSymbol, value: package main)
        - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: 
class scala.reflect.internal.Symbols$Symbol)
        - object (class scala.reflect.internal.Types$UniqueThisType, main.type)
        - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: 
class scala.reflect.internal.Types$Type)
        - object (class scala.reflect.internal.Types$TypeRef$$anon$6, 
main.listType1)
        - field (class: 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2,
 name: elementType$1, type: class scala.reflect.api.Types$TypeApi)
        - object (class 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2,
 <function0>)
        - field (class: 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1,
 name: $outer, type: class 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2)
        - object (class 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1,
 <function1>)
        - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, 
name: function, type: interface scala.Function1)
        - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, 
mapobjects(<function1>,cast(name#1 as 
array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)))
        - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: 
targetObject, type: class org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class org.apache.spark.sql.catalyst.expressions.Invoke, 
invoke(mapobjects(<function1>,cast(name#1 as 
array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class
 [Lmain.listType1;)))
        - writeObject data (class: scala.collection.immutable.$colon$colon)
        - object (class scala.collection.immutable.$colon$colon, 
List(invoke(count#0,toString,ObjectType(class java.lang.String)), 
invoke(mapobjects(<function1>,cast(name#1 as 
array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class
 [Lmain.listType1;))))
        - field (class: org.apache.spark.sql.catalyst.expressions.NewInstance, 
name: arguments, type: interface scala.collection.Seq)
        - object (class org.apache.spark.sql.catalyst.expressions.NewInstance, 
newinstance(class main.DatasetType1,invoke(count#0,toString,ObjectType(class 
java.lang.String)),invoke(mapobjects(<function1>,cast(name#1 as 
array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>),StructField(display_name,StringType,true),StructField(first_name,StringType,true),StructField(full_name,StringType,true),StructField(reprint,StringType,true),StructField(role,StringType,true),StructField(wos_standard,StringType,true),StructField(last_name,StringType,true),StructField(dais_id,StringType,true),StructField(seq_no,StringType,true),StructField(suffix,StringType,true),StructField(r_id,StringType,true),StructField(addr_no,StringType,true)),array,ObjectType(class
 [Lmain.listType1;)),false,ObjectType(class main.DatasetType1),None))
        - field (class: 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name: 
fromRowExpression, type: class 
org.apache.spark.sql.catalyst.expressions.Expression)
        - object (class 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[count[0]: 
string, name#ExprId(4,879d493d-efa1-4799-8fc4-5872ccb3b07b): 
array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>])
        - field (class: org.apache.spark.sql.execution.MapPartitions, name: 
tEncoder, type: class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder)
        - object (class org.apache.spark.sql.execution.MapPartitions, 
!MapPartitions <function1>, class[count[0]: string, 
name#ExprId(4,879d493d-efa1-4799-8fc4-5872ccb3b07b): 
array<struct<display_name:string,first_name:string,full_name:string,reprint:string,role:string,wos_standard:string,last_name:string,dais_id:string,seq_no:string,suffix:string,r_id:string,addr_no:string>>],
 class[value[0]: string], [value#10]
+- ConvertToSafe
   +- Scan JSONRelation[count#0,name#1] InputPaths: 
)
        - field (class: 
org.apache.spark.sql.execution.MapPartitions$$anonfun$8, name: $outer, type: 
class org.apache.spark.sql.execution.MapPartitions)
        - object (class 
org.apache.spark.sql.execution.MapPartitions$$anonfun$8, <function1>)
        - field (class: 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, name: f$22, type: 
interface scala.Function1)
        - object (class 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, <function0>)
        - field (class: 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, 
name: $outer, type: class 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
        - object (class 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, 
<function3>)
        - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: 
interface scala.Function3)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, 
MapPartitionsRDD[6] at collect at main.scala:63)
        - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: 
class org.apache.spark.rdd.RDD)
        - object (class org.apache.spark.OneToOneDependency, 
org.apache.spark.OneToOneDependency@7793b55d)
        - writeObject data (class: scala.collection.immutable.$colon$colon)
        - object (class scala.collection.immutable.$colon$colon, 
List(org.apache.spark.OneToOneDependency@7793b55d))
        - field (class: org.apache.spark.rdd.RDD, name: 
org$apache$spark$rdd$RDD$$dependencies_, type: interface scala.collection.Seq)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, 
MapPartitionsRDD[7] at collect at main.scala:63)
        - field (class: org.apache.spark.rdd.RDD$$anonfun$collect$1, name: 
$outer, type: class org.apache.spark.rdd.RDD)
        - object (class org.apache.spark.rdd.RDD$$anonfun$collect$1, 
<function0>)
        - field (class: 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, name: $outer, type: 
class org.apache.spark.rdd.RDD$$anonfun$collect$1)
        - object (class 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12, <function1>)
        at 
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at 
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at 
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 19 more
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to