[ 
https://issues.apache.org/jira/browse/SPARK-12714?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

James Eastwood resolved SPARK-12714.
------------------------------------
    Resolution: Fixed

[~marmbrus] Sorry for taking an age to get back to you -- I've tested this with 
1.6.0-SNAPSHOT and it is indeed working. Thanks :).

> Transforming Dataset with sequences of case classes to RDD causes Task Not 
> Serializable exception
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-12714
>                 URL: https://issues.apache.org/jira/browse/SPARK-12714
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.0
>         Environment: linux 3.13.0-24-generic, scala 2.10.6
>            Reporter: James Eastwood
>
> Attempting to transform a Dataset of a case class containing a nested 
> sequence of case classes causes an exception to be thrown: 
> `org.apache.spark.SparkException: Task not serializable`.
> Here is a minimum repro:
> {code}
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.{SparkContext, SparkConf}
> case class Top(a: String, nested: Array[Nested])
> case class Nested(b: String)
> object scratch {
>   def main ( args: Array[String] ) {
>     lazy val sparkConf = new 
> SparkConf().setAppName("scratch").setMaster("local[1]")
>     lazy val sparkContext = new SparkContext(sparkConf)
>     lazy val sqlContext = new SQLContext(sparkContext)
>     val input = List(
>       """{ "a": "123", "nested": [{ "b": "123" }] }"""
>     )
>     import sqlContext.implicits._
>     val ds = sqlContext.read.json(sparkContext.parallelize(input)).as[Top]
>     ds.rdd.foreach(println)
>     sparkContext.stop()
>   }
> }
> {code}
> {code}
> scalaVersion := "2.10.6"
> lazy val sparkVersion = "1.6.0"
> libraryDependencies ++= List(
>   "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>   "org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
>   "org.apache.spark" %% "spark-hive" % sparkVersion % "provided"
> )
> {code}
> Full stack trace:
> {code}
> [error] (run-main-0) org.apache.spark.SparkException: Task not serializable
> org.apache.spark.SparkException: Task not serializable
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
>       at org.apache.spark.sql.Dataset.rdd(Dataset.scala:166)
>       at scratch$.main(scratch.scala:26)
>       at scratch.main(scratch.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
> Caused by: java.io.NotSerializableException: 
> scala.reflect.internal.Mirrors$Roots$EmptyPackageClass$
> Serialization stack:
>       - object not serializable (class: 
> scala.reflect.internal.Mirrors$Roots$EmptyPackageClass$, value: package 
> <empty>)
>       - field (class: scala.reflect.internal.Types$ThisType, name: sym, type: 
> class scala.reflect.internal.Symbols$Symbol)
>       - object (class scala.reflect.internal.Types$UniqueThisType, <empty>)
>       - field (class: scala.reflect.internal.Types$TypeRef, name: pre, type: 
> class scala.reflect.internal.Types$Type)
>       - object (class scala.reflect.internal.Types$TypeRef$$anon$6, Nested)
>       - field (class: 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2,
>  name: elementType$1, type: class scala.reflect.api.Types$TypeApi)
>       - object (class 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2,
>  <function0>)
>       - field (class: 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1,
>  name: $outer, type: class 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2)
>       - object (class 
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$constructorFor$2$$anonfun$apply$1,
>  <function1>)
>       - field (class: org.apache.spark.sql.catalyst.expressions.MapObjects, 
> name: function, type: interface scala.Function1)
>       - object (class org.apache.spark.sql.catalyst.expressions.MapObjects, 
> mapobjects(<function1>,input[1, 
> ArrayType(StructType(StructField(b,StringType,true)),true)],StructField(b,StringType,true)))
>       - field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: 
> targetObject, type: class 
> org.apache.spark.sql.catalyst.expressions.Expression)
>       - object (class org.apache.spark.sql.catalyst.expressions.Invoke, 
> invoke(mapobjects(<function1>,input[1, 
> ArrayType(StructType(StructField(b,StringType,true)),true)],StructField(b,StringType,true)),array,ObjectType(class
>  [LNested;)))
>       - writeObject data (class: scala.collection.immutable.$colon$colon)
>       - object (class scala.collection.immutable.$colon$colon, 
> List(invoke(input[0, StringType],toString,ObjectType(class 
> java.lang.String)), invoke(mapobjects(<function1>,input[1, 
> ArrayType(StructType(StructField(b,StringType,true)),true)],StructField(b,StringType,true)),array,ObjectType(class
>  [LNested;))))
>       - field (class: org.apache.spark.sql.catalyst.expressions.NewInstance, 
> name: arguments, type: interface scala.collection.Seq)
>       - object (class org.apache.spark.sql.catalyst.expressions.NewInstance, 
> newinstance(class Top,invoke(input[0, StringType],toString,ObjectType(class 
> java.lang.String)),invoke(mapobjects(<function1>,input[1, 
> ArrayType(StructType(StructField(b,StringType,true)),true)],StructField(b,StringType,true)),array,ObjectType(class
>  [LNested;)),false,ObjectType(class Top),None))
>       - field (class: 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, name: 
> fromRowExpression, type: class 
> org.apache.spark.sql.catalyst.expressions.Expression)
>       - object (class 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, class[a[0]: string, 
> nested#ExprId(4,bc90ecfb-37ae-45bd-b0a1-7365a1a233d1): 
> array<struct<b:string>>])
>       - field (class: org.apache.spark.sql.Dataset, name: boundTEncoder, 
> type: class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder)
>       - object (class org.apache.spark.sql.Dataset, [a: string, nested: 
> array<struct<b:string>>])
>       - field (class: org.apache.spark.sql.Dataset$$anonfun$rdd$1, name: 
> $outer, type: class org.apache.spark.sql.Dataset)
>       - object (class org.apache.spark.sql.Dataset$$anonfun$rdd$1, 
> <function1>)
>       at 
> org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
>       at 
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
>       at 
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
>       at 
> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
>       at 
> org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
>       at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
>       at org.apache.spark.SparkContext.clean(SparkContext.scala:2055)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:707)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:706)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>       at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
>       at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:706)
>       at org.apache.spark.sql.Dataset.rdd(Dataset.scala:166)
>       at scratch$.main(scratch.scala:26)
>       at scratch.main(scratch.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
> {code}
> Given the messages surrounding Datasets supporting only primitive types and 
> case classes I'm not 100% sure this is a bug or just as-yet unsupported.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to