Thanks Micheal. Let me test it with a recent master code branch.

Also for every mapping step should I have to create a new case class? I
cannot use Tuple as I have ~130 columns to process. Earlier I had used a
Seq[Any] (actually Array[Any] to optimize on serialization) but processed
it using RDD (by building the Schema at runtime). Now I am attempting to
replace this using Dataset.

>the problem is that at compile time we don't know if its an inner or outer
join.
May I suggest to have different methods for different kind of joins
(similar to RDD api)? This way the typesafety is enforced.

Here is the error message.

Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task not serializable: java.io.NotSerializableException:
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
Serialization stack: - object not serializable (class:
scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1,
value: package lang) - field (class: scala.reflect.internal.Types$ThisType,
name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object
(class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field
(class: scala.reflect.internal.Types$TypeRef, name: pre, type: class
scala.reflect.internal.Types$Type) - object (class
scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class:
scala.reflect.internal.Types$TypeRef, name: normalized, type: class
scala.reflect.internal.Types$Type) - object (class
scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class:
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1,
type: class scala.reflect.api.Types$TypeApi) - object (class
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class:
org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type:
interface scala.Function1) - object (class
org.apache.spark.sql.catalyst.expressions.MapObjects,
mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field
(class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) -
field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name:
targetObject, type: class
org.apache.spark.sql.catalyst.expressions.Expression) - object (class
org.apache.spark.sql.catalyst.expressions.Invoke,
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;))) - writeObject data (class:
scala.collection.immutable.List$SerializationProxy) - object (class
scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@7e78c3cf) - writeReplace
data (class: scala.collection.immutable.List$SerializationProxy) - object
(class scala.collection.immutable.$colon$colon,
List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),
invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)))) - field (class:
org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments,
type: interface scala.collection.Seq) - object (class
org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),true)) - writeObject data (class:
scala.collection.immutable.List$SerializationProxy) - object (class
scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@377795c5) - writeReplace
data (class: scala.collection.immutable.List$SerializationProxy) - object
(class scala.collection.immutable.$colon$colon, List(staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),true))) - field (class:
org.apache.spark.sql.catalyst.expressions.NewInstance, name: arguments,
type: interface scala.collection.Seq) - object (class
org.apache.spark.sql.catalyst.expressions.NewInstance, newinstance(class
collector.MyMap,staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
field (class: "scala.collection.immutable.Map", name: "map"),- root class:
"collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
[Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None))
- field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
name: fromRowExpression, type: class
org.apache.spark.sql.catalyst.expressions.Expression) - object (class
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map]) - field
(class: org.apache.spark.sql.execution.MapPartitions, name: uEncoder, type:
class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) - object
(class org.apache.spark.sql.execution.MapPartitions, !MapPartitions ,
class[a[0]: string, b[0]: string],
class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map], [map#13] +-
LocalTableScan [a#2,b#3],
[[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,3261746164]]
) - field (class: org.apache.spark.sql.execution.MapPartitions$$anonfun$8,
name: $outer, type: class org.apache.spark.sql.execution.MapPartitions) -
object (class org.apache.spark.sql.execution.MapPartitions$$anonfun$8, ) -
field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,
name: f$22, type: interface scala.Function1) - object (class
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, ) - field
(class:
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
name: $outer, type: class
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - object (class
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type:
interface scala.Function3) - object (class
org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at show at
CollectorSparkTest.scala:50) - field (class:
org.apache.spark.NarrowDependency, name: rdd, type: class
org.apache.spark.rdd.RDD) - object (class
org.apache.spark.OneToOneDependency,
org.apache.spark.OneToOneDependency@110f15b7) - writeObject data (class:
scala.collection.immutable.List$SerializationProxy) - object (class
scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@6bb23696) - writeReplace
data (class: scala.collection.immutable.List$SerializationProxy) - object
(class scala.collection.immutable.$colon$colon,
List(org.apache.spark.OneToOneDependency@110f15b7)) - field (class:
org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies,
type: interface scala.collection.Seq) - object (class
org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at show at
CollectorSparkTest.scala:50) - field (class: scala.Tuple2, name: _1, type:
class java.lang.Object) - object (class scala.Tuple2, (MapPartitionsRDD[2]
at show at CollectorSparkTest.scala:50,)) at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at
org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
at
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at
org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
at
org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
at
org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
at
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at
org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
at 
org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
at
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
at
org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at
org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at
org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at
org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) at
org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) at
org.apache.spark.sql.Dataset.show(Dataset.scala:228) at
org.apache.spark.sql.Dataset.show(Dataset.scala:192) at
org.apache.spark.sql.Dataset.show(Dataset.scala:200)

On Tue, Jan 12, 2016 at 10:39 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
>>
>> This looks like a bug.  What is the error?  It might be fixed in
> branch-1.6/master if you can test there.
>
>> Please advice on what I may be missing here?
>>
>>
>> Also for join, may I suggest to have a custom encoder / transformation to
>> say how 2 datasets can merge?
>> Also, when a join in made using something like 'left outer join' the
>> right side object should ideally be Option kind (similar to what's seen in
>> RDD). And I think this may make it strongly typed?
>>
>
> I think you can actually use as to convert this to an Option if you'd like
> typesafety.  the problem is that at compile time we don't know if its an
> inner or outer join.
>
>

Reply via email to