Awesome, thanks for opening the JIRA! We'll take a look. On Tue, Jan 12, 2016 at 1:53 PM, Muthu Jayakumar <bablo...@gmail.com> wrote:
> I tried to rerun the same code with current snapshot version of 1.6 and > 2.0 from > https://repository.apache.org/content/repositories/snapshots/org/apache/spark/spark-core_2.11/ > > But I still see an exception around the same line. Here is the exception > below. Filed an issue against the same SPARK-12783 > <https://issues.apache.org/jira/browse/SPARK-12783> > > .13:49:07.388 [main] ERROR o.a.s.s.c.e.c.GenerateSafeProjection - failed > to compile: org.codehaus.commons.compiler.CompileException: File > 'generated.java', Line 140, Column 47: No applicable constructor/method > found for actual parameters "scala.collection.Map"; candidates are: > "collector.MyMap(scala.collection.immutable.Map)" > /* 001 */ > /* 002 */ public java.lang.Object > generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) { > /* 003 */ return new SpecificSafeProjection(expr); > /* 004 */ } > /* 005 */ > /* 006 */ class SpecificSafeProjection extends > org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection { > /* 007 */ > /* 008 */ private org.apache.spark.sql.catalyst.expressions.Expression[] > expressions; > /* 009 */ private org.apache.spark.sql.catalyst.expressions.MutableRow > mutableRow; > /* 010 */ > /* 011 */ > /* 012 */ > /* 013 */ public > SpecificSafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[] > expr) { > /* 014 */ expressions = expr; > /* 015 */ mutableRow = new > org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1); > /* 016 */ > /* 017 */ } > /* 018 */ > /* 019 */ public java.lang.Object apply(java.lang.Object _i) { > /* 020 */ InternalRow i = (InternalRow) _i; > /* 021 */ /* newinstance(class collector.MyMap,staticinvoke(class > org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface > scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class > java.lang.String)),invoke(input[0, > MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class > [Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class > java.lang.String)),invoke(input[0, > MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class > [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None) */ > /* 022 */ /* staticinvoke(class > org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface > scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class > java.lang.String)),invoke(input[0, > MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class > [Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class > java.lang.String)),invoke(input[0, > MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class > [Ljava.lang.Object;)),true) */ > /* 023 */ /* > invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class > java.lang.String)),invoke(input[0, > MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class > [Ljava.lang.Object;)) */ > /* 024 */ /* > mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class > java.lang.String)),invoke(input[0, > MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))) > */ > /* 025 */ /* invoke(input[0, > MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true)) */ > /* 026 */ /* input[0, MapType(StringType,StringType,true)] */ > /* 027 */ boolean isNull10 = i.isNullAt(0); > /* 028 */ MapData primitive11 = isNull10 ? null : (i.getMap(0)); > /* 029 */ > /* 030 */ > /* 031 */ boolean isNull8 = isNull10; > /* 032 */ ArrayData primitive9 = > /* 033 */ isNull8 ? > /* 034 */ null : (ArrayData) primitive11.keyArray(); > /* 035 */ isNull8 = primitive9 == null; > /* 036 */ > /* 037 */ boolean isNull6 = primitive9 == null; > /* 038 */ ArrayData primitive7 = null; > /* 039 */ > /* 040 */ if (!isNull6) { > /* 041 */ java.lang.String[] convertedArray15 = null; > /* 042 */ int dataLength14 = primitive9.numElements(); > /* 043 */ convertedArray15 = new java.lang.String[dataLength14]; > /* 044 */ > /* 045 */ int loopIndex16 = 0; > /* 046 */ while (loopIndex16 < dataLength14) { > /* 047 */ UTF8String MapObjects_loopValue4 = > /* 048 */ (UTF8String)primitive9.getUTF8String(loopIndex16); > /* 049 */ boolean MapObjects_loopIsNull5 = isNull8 || > MapObjects_loopValue4 == null; > /* 050 */ > /* 051 */ /* > invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class > java.lang.String)) */ > /* 052 */ boolean isNull12 = MapObjects_loopIsNull5; > /* 053 */ java.lang.String primitive13 = > /* 054 */ isNull12 ? > /* 055 */ null : (java.lang.String) > MapObjects_loopValue4.toString(); > /* 056 */ isNull12 = primitive13 == null; > /* 057 */ if (isNull12) { > /* 058 */ convertedArray15[loopIndex16] = null; > /* 059 */ } else { > /* 060 */ convertedArray15[loopIndex16] = primitive13; > /* 061 */ } > /* 062 */ > /* 063 */ loopIndex16 += 1; > /* 064 */ } > /* 065 */ > /* 066 */ isNull6 = false; > /* 067 */ primitive7 = new > org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray15); > /* 068 */ } > /* 069 */ > /* 070 */ > /* 071 */ boolean isNull4 = isNull6; > /* 072 */ java.lang.Object[] primitive5 = > /* 073 */ isNull4 ? > /* 074 */ null : (java.lang.Object[]) primitive7.array(); > /* 075 */ isNull4 = primitive5 == null; > /* 076 */ /* > invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class > java.lang.String)),invoke(input[0, > MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class > [Ljava.lang.Object;)) */ > /* 077 */ /* > mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class > java.lang.String)),invoke(input[0, > MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))) > */ > /* 078 */ /* invoke(input[0, > MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true)) > */ > /* 079 */ /* input[0, MapType(StringType,StringType,true)] */ > /* 080 */ boolean isNull23 = i.isNullAt(0); > /* 081 */ MapData primitive24 = isNull23 ? null : (i.getMap(0)); > /* 082 */ > /* 083 */ > /* 084 */ boolean isNull21 = isNull23; > /* 085 */ ArrayData primitive22 = > /* 086 */ isNull21 ? > /* 087 */ null : (ArrayData) primitive24.valueArray(); > /* 088 */ isNull21 = primitive22 == null; > /* 089 */ > /* 090 */ boolean isNull19 = primitive22 == null; > /* 091 */ ArrayData primitive20 = null; > /* 092 */ > /* 093 */ if (!isNull19) { > /* 094 */ java.lang.String[] convertedArray28 = null; > /* 095 */ int dataLength27 = primitive22.numElements(); > /* 096 */ convertedArray28 = new java.lang.String[dataLength27]; > /* 097 */ > /* 098 */ int loopIndex29 = 0; > /* 099 */ while (loopIndex29 < dataLength27) { > /* 100 */ UTF8String MapObjects_loopValue6 = > /* 101 */ (UTF8String)primitive22.getUTF8String(loopIndex29); > /* 102 */ boolean MapObjects_loopIsNull7 = isNull21 || > MapObjects_loopValue6 == null; > /* 103 */ > /* 104 */ /* > invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class > java.lang.String)) */ > /* 105 */ boolean isNull25 = MapObjects_loopIsNull7; > /* 106 */ java.lang.String primitive26 = > /* 107 */ isNull25 ? > /* 108 */ null : (java.lang.String) > MapObjects_loopValue6.toString(); > /* 109 */ isNull25 = primitive26 == null; > /* 110 */ if (isNull25) { > /* 111 */ convertedArray28[loopIndex29] = null; > /* 112 */ } else { > /* 113 */ convertedArray28[loopIndex29] = primitive26; > /* 114 */ } > /* 115 */ > /* 116 */ loopIndex29 += 1; > /* 117 */ } > /* 118 */ > /* 119 */ isNull19 = false; > /* 120 */ primitive20 = new > org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray28); > /* 121 */ } > /* 122 */ > /* 123 */ > /* 124 */ boolean isNull17 = isNull19; > /* 125 */ java.lang.Object[] primitive18 = > /* 126 */ isNull17 ? > /* 127 */ null : (java.lang.Object[]) primitive20.array(); > /* 128 */ isNull17 = primitive18 == null; > /* 129 */ > /* 130 */ boolean isNull2 = !!(isNull4 || isNull17); > /* 131 */ scala.collection.Map primitive3 = null; > /* 132 */ > /* 133 */ if (!(isNull4 || isNull17)) { > /* 134 */ primitive3 = > org.apache.spark.sql.catalyst.util.ArrayBasedMapData.toScalaMap(primitive5, > primitive18); > /* 135 */ isNull2 = primitive3 == null; > /* 136 */ } > /* 137 */ > /* 138 */ > /* 139 */ > /* 140 */ final collector.MyMap primitive1 = new > collector.MyMap(primitive3); > /* 141 */ final boolean isNull0 = false; > /* 142 */ if (isNull0) { > /* 143 */ mutableRow.setNullAt(0); > /* 144 */ } else { > /* 145 */ > /* 146 */ mutableRow.update(0, primitive1); > /* 147 */ } > /* 148 */ > /* 149 */ return mutableRow; > /* 150 */ } > /* 151 */ } > /* 152 */ > > Thanks. > > > > On Tue, Jan 12, 2016 at 11:35 AM, Muthu Jayakumar <bablo...@gmail.com> > wrote: > >> Thanks Micheal. Let me test it with a recent master code branch. >> >> Also for every mapping step should I have to create a new case class? I >> cannot use Tuple as I have ~130 columns to process. Earlier I had used a >> Seq[Any] (actually Array[Any] to optimize on serialization) but processed >> it using RDD (by building the Schema at runtime). Now I am attempting to >> replace this using Dataset. >> >> >the problem is that at compile time we don't know if its an inner or >> outer join. >> May I suggest to have different methods for different kind of joins >> (similar to RDD api)? This way the typesafety is enforced. >> >> Here is the error message. >> >> Exception in thread "main" org.apache.spark.SparkException: Job aborted >> due to stage failure: Task not serializable: >> java.io.NotSerializableException: >> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1 >> Serialization stack: - object not serializable (class: >> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1, >> value: package lang) - field (class: scala.reflect.internal.Types$ThisType, >> name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object >> (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field >> (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class >> scala.reflect.internal.Types$Type) - object (class >> scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class: >> scala.reflect.internal.Types$TypeRef, name: normalized, type: class >> scala.reflect.internal.Types$Type) - object (class >> scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class: >> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1, >> type: class scala.reflect.api.Types$TypeApi) - object (class >> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class: >> org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type: >> interface scala.Function1) - object (class >> org.apache.spark.sql.catalyst.expressions.MapObjects, >> mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field >> (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) - >> field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name: >> targetObject, type: class >> org.apache.spark.sql.catalyst.expressions.Expression) - object (class >> org.apache.spark.sql.catalyst.expressions.Invoke, >> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- >> field (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >> [Ljava.lang.Object;))) - writeObject data (class: >> scala.collection.immutable.List$SerializationProxy) - object (class >> scala.collection.immutable.List$SerializationProxy, >> scala.collection.immutable.List$SerializationProxy@7e78c3cf) - >> writeReplace data (class: >> scala.collection.immutable.List$SerializationProxy) - object (class >> scala.collection.immutable.$colon$colon, >> List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- >> field (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >> [Ljava.lang.Object;)), >> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- >> field (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >> [Ljava.lang.Object;)))) - field (class: >> org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments, >> type: interface scala.collection.Seq) - object (class >> org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class >> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface >> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- >> field (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- >> field (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >> [Ljava.lang.Object;)),true)) - writeObject data (class: >> scala.collection.immutable.List$SerializationProxy) - object (class >> scala.collection.immutable.List$SerializationProxy, >> scala.collection.immutable.List$SerializationProxy@377795c5) - >> writeReplace data (class: >> scala.collection.immutable.List$SerializationProxy) - object (class >> scala.collection.immutable.$colon$colon, List(staticinvoke(class >> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface >> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- >> field (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- >> field (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >> [Ljava.lang.Object;)),true))) - field (class: >> org.apache.spark.sql.catalyst.expressions.NewInstance, name: arguments, >> type: interface scala.collection.Seq) - object (class >> org.apache.spark.sql.catalyst.expressions.NewInstance, newinstance(class >> collector.MyMap,staticinvoke(class >> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface >> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- >> field (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- >> field (class: "scala.collection.immutable.Map", name: "map"),- root class: >> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class >> [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None)) >> - field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, >> name: fromRowExpression, type: class >> org.apache.spark.sql.catalyst.expressions.Expression) - object (class >> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder, >> class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map]) - field >> (class: org.apache.spark.sql.execution.MapPartitions, name: uEncoder, type: >> class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) - object >> (class org.apache.spark.sql.execution.MapPartitions, !MapPartitions , >> class[a[0]: string, b[0]: string], >> class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map], [map#13] +- >> LocalTableScan [a#2,b#3], >> [[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,3261746164]] >> ) - field (class: org.apache.spark.sql.execution.MapPartitions$$anonfun$8, >> name: $outer, type: class org.apache.spark.sql.execution.MapPartitions) - >> object (class org.apache.spark.sql.execution.MapPartitions$$anonfun$8, ) - >> field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, >> name: f$22, type: interface scala.Function1) - object (class >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, ) - field >> (class: >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, >> name: $outer, type: class >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - object (class >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21, >> ) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type: >> interface scala.Function3) - object (class >> org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at show at >> CollectorSparkTest.scala:50) - field (class: >> org.apache.spark.NarrowDependency, name: rdd, type: class >> org.apache.spark.rdd.RDD) - object (class >> org.apache.spark.OneToOneDependency, >> org.apache.spark.OneToOneDependency@110f15b7) - writeObject data (class: >> scala.collection.immutable.List$SerializationProxy) - object (class >> scala.collection.immutable.List$SerializationProxy, >> scala.collection.immutable.List$SerializationProxy@6bb23696) - >> writeReplace data (class: >> scala.collection.immutable.List$SerializationProxy) - object (class >> scala.collection.immutable.$colon$colon, >> List(org.apache.spark.OneToOneDependency@110f15b7)) - field (class: >> org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies, >> type: interface scala.collection.Seq) - object (class >> org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at show at >> CollectorSparkTest.scala:50) - field (class: scala.Tuple2, name: _1, type: >> class java.lang.Object) - object (class scala.Tuple2, (MapPartitionsRDD[2] >> at show at CollectorSparkTest.scala:50,)) at >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) >> at >> org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010) >> at >> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921) >> at >> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at >> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at >> org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at >> org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at >> org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at >> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) >> at >> org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) >> at >> org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) >> at >> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) >> at >> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) >> at >> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) >> at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) >> at >> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) >> at >> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) >> at >> org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) >> at >> org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) >> at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at >> org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at >> org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at >> org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) at >> org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) at >> org.apache.spark.sql.Dataset.show(Dataset.scala:228) at >> org.apache.spark.sql.Dataset.show(Dataset.scala:192) at >> org.apache.spark.sql.Dataset.show(Dataset.scala:200) >> >> On Tue, Jan 12, 2016 at 10:39 AM, Michael Armbrust < >> mich...@databricks.com> wrote: >> >>> df1.as[TestCaseClass].map(_.toMyMap).show() //fails >>>> >>>> This looks like a bug. What is the error? It might be fixed in >>> branch-1.6/master if you can test there. >>> >>>> Please advice on what I may be missing here? >>>> >>>> >>>> Also for join, may I suggest to have a custom encoder / transformation >>>> to say how 2 datasets can merge? >>>> Also, when a join in made using something like 'left outer join' the >>>> right side object should ideally be Option kind (similar to what's seen in >>>> RDD). And I think this may make it strongly typed? >>>> >>> >>> I think you can actually use as to convert this to an Option if you'd >>> like typesafety. the problem is that at compile time we don't know if its >>> an inner or outer join. >>> >>> >> >> >