I tried to rerun the same code with current snapshot version of 1.6 and 2.0
from
https://repository.apache.org/content/repositories/snapshots/org/apache/spark/spark-core_2.11/

But I still see an exception around the same line. Here is the exception
below. Filed an issue against the same SPARK-12783
<https://issues.apache.org/jira/browse/SPARK-12783>

.13:49:07.388 [main] ERROR o.a.s.s.c.e.c.GenerateSafeProjection - failed to
compile: org.codehaus.commons.compiler.CompileException: File
'generated.java', Line 140, Column 47: No applicable constructor/method
found for actual parameters "scala.collection.Map"; candidates are:
"collector.MyMap(scala.collection.immutable.Map)"
/* 001 */
/* 002 */ public java.lang.Object
generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
/* 003 */   return new SpecificSafeProjection(expr);
/* 004 */ }
/* 005 */
/* 006 */ class SpecificSafeProjection extends
org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
/* 007 */
/* 008 */   private org.apache.spark.sql.catalyst.expressions.Expression[]
expressions;
/* 009 */   private org.apache.spark.sql.catalyst.expressions.MutableRow
mutableRow;
/* 010 */
/* 011 */
/* 012 */
/* 013 */   public
SpecificSafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
expr) {
/* 014 */     expressions = expr;
/* 015 */     mutableRow = new
org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1);
/* 016 */
/* 017 */   }
/* 018 */
/* 019 */   public java.lang.Object apply(java.lang.Object _i) {
/* 020 */     InternalRow i = (InternalRow) _i;
/* 021 */     /* newinstance(class collector.MyMap,staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None) */
/* 022 */     /* staticinvoke(class
org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)),true) */
/* 023 */     /*
invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)) */
/* 024 */     /*
mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true)))
*/
/* 025 */     /* invoke(input[0,
MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true)) */
/* 026 */     /* input[0, MapType(StringType,StringType,true)] */
/* 027 */     boolean isNull10 = i.isNullAt(0);
/* 028 */     MapData primitive11 = isNull10 ? null : (i.getMap(0));
/* 029 */
/* 030 */
/* 031 */     boolean isNull8 = isNull10;
/* 032 */     ArrayData primitive9 =
/* 033 */     isNull8 ?
/* 034 */     null : (ArrayData) primitive11.keyArray();
/* 035 */     isNull8 = primitive9 == null;
/* 036 */
/* 037 */     boolean isNull6 = primitive9 == null;
/* 038 */     ArrayData primitive7 = null;
/* 039 */
/* 040 */     if (!isNull6) {
/* 041 */       java.lang.String[] convertedArray15 = null;
/* 042 */       int dataLength14 = primitive9.numElements();
/* 043 */       convertedArray15 = new java.lang.String[dataLength14];
/* 044 */
/* 045 */       int loopIndex16 = 0;
/* 046 */       while (loopIndex16 < dataLength14) {
/* 047 */         UTF8String MapObjects_loopValue4 =
/* 048 */         (UTF8String)primitive9.getUTF8String(loopIndex16);
/* 049 */         boolean MapObjects_loopIsNull5 = isNull8 ||
MapObjects_loopValue4 == null;
/* 050 */
/* 051 */         /*
invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
java.lang.String)) */
/* 052 */         boolean isNull12 = MapObjects_loopIsNull5;
/* 053 */         java.lang.String primitive13 =
/* 054 */         isNull12 ?
/* 055 */         null : (java.lang.String)
MapObjects_loopValue4.toString();
/* 056 */         isNull12 = primitive13 == null;
/* 057 */         if (isNull12) {
/* 058 */           convertedArray15[loopIndex16] = null;
/* 059 */         } else {
/* 060 */           convertedArray15[loopIndex16] = primitive13;
/* 061 */         }
/* 062 */
/* 063 */         loopIndex16 += 1;
/* 064 */       }
/* 065 */
/* 066 */       isNull6 = false;
/* 067 */       primitive7 = new
org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray15);
/* 068 */     }
/* 069 */
/* 070 */
/* 071 */     boolean isNull4 = isNull6;
/* 072 */     java.lang.Object[] primitive5 =
/* 073 */     isNull4 ?
/* 074 */     null : (java.lang.Object[]) primitive7.array();
/* 075 */     isNull4 = primitive5 == null;
/* 076 */     /*
invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
[Ljava.lang.Object;)) */
/* 077 */     /*
mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
java.lang.String)),invoke(input[0,
MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true)))
*/
/* 078 */     /* invoke(input[0,
MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))
*/
/* 079 */     /* input[0, MapType(StringType,StringType,true)] */
/* 080 */     boolean isNull23 = i.isNullAt(0);
/* 081 */     MapData primitive24 = isNull23 ? null : (i.getMap(0));
/* 082 */
/* 083 */
/* 084 */     boolean isNull21 = isNull23;
/* 085 */     ArrayData primitive22 =
/* 086 */     isNull21 ?
/* 087 */     null : (ArrayData) primitive24.valueArray();
/* 088 */     isNull21 = primitive22 == null;
/* 089 */
/* 090 */     boolean isNull19 = primitive22 == null;
/* 091 */     ArrayData primitive20 = null;
/* 092 */
/* 093 */     if (!isNull19) {
/* 094 */       java.lang.String[] convertedArray28 = null;
/* 095 */       int dataLength27 = primitive22.numElements();
/* 096 */       convertedArray28 = new java.lang.String[dataLength27];
/* 097 */
/* 098 */       int loopIndex29 = 0;
/* 099 */       while (loopIndex29 < dataLength27) {
/* 100 */         UTF8String MapObjects_loopValue6 =
/* 101 */         (UTF8String)primitive22.getUTF8String(loopIndex29);
/* 102 */         boolean MapObjects_loopIsNull7 = isNull21 ||
MapObjects_loopValue6 == null;
/* 103 */
/* 104 */         /*
invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
java.lang.String)) */
/* 105 */         boolean isNull25 = MapObjects_loopIsNull7;
/* 106 */         java.lang.String primitive26 =
/* 107 */         isNull25 ?
/* 108 */         null : (java.lang.String)
MapObjects_loopValue6.toString();
/* 109 */         isNull25 = primitive26 == null;
/* 110 */         if (isNull25) {
/* 111 */           convertedArray28[loopIndex29] = null;
/* 112 */         } else {
/* 113 */           convertedArray28[loopIndex29] = primitive26;
/* 114 */         }
/* 115 */
/* 116 */         loopIndex29 += 1;
/* 117 */       }
/* 118 */
/* 119 */       isNull19 = false;
/* 120 */       primitive20 = new
org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray28);
/* 121 */     }
/* 122 */
/* 123 */
/* 124 */     boolean isNull17 = isNull19;
/* 125 */     java.lang.Object[] primitive18 =
/* 126 */     isNull17 ?
/* 127 */     null : (java.lang.Object[]) primitive20.array();
/* 128 */     isNull17 = primitive18 == null;
/* 129 */
/* 130 */     boolean isNull2 = !!(isNull4 || isNull17);
/* 131 */     scala.collection.Map primitive3 = null;
/* 132 */
/* 133 */     if (!(isNull4 || isNull17)) {
/* 134 */       primitive3 =
org.apache.spark.sql.catalyst.util.ArrayBasedMapData.toScalaMap(primitive5,
primitive18);
/* 135 */       isNull2 = primitive3 == null;
/* 136 */     }
/* 137 */
/* 138 */
/* 139 */
/* 140 */     final collector.MyMap primitive1 = new
collector.MyMap(primitive3);
/* 141 */     final boolean isNull0 = false;
/* 142 */     if (isNull0) {
/* 143 */       mutableRow.setNullAt(0);
/* 144 */     } else {
/* 145 */
/* 146 */       mutableRow.update(0, primitive1);
/* 147 */     }
/* 148 */
/* 149 */     return mutableRow;
/* 150 */   }
/* 151 */ }
/* 152 */

Thanks.



On Tue, Jan 12, 2016 at 11:35 AM, Muthu Jayakumar <bablo...@gmail.com>
wrote:

> Thanks Micheal. Let me test it with a recent master code branch.
>
> Also for every mapping step should I have to create a new case class? I
> cannot use Tuple as I have ~130 columns to process. Earlier I had used a
> Seq[Any] (actually Array[Any] to optimize on serialization) but processed
> it using RDD (by building the Schema at runtime). Now I am attempting to
> replace this using Dataset.
>
> >the problem is that at compile time we don't know if its an inner or
> outer join.
> May I suggest to have different methods for different kind of joins
> (similar to RDD api)? This way the typesafety is enforced.
>
> Here is the error message.
>
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task not serializable:
> java.io.NotSerializableException:
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
> Serialization stack: - object not serializable (class:
> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1,
> value: package lang) - field (class: scala.reflect.internal.Types$ThisType,
> name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object
> (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field
> (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class
> scala.reflect.internal.Types$Type) - object (class
> scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class:
> scala.reflect.internal.Types$TypeRef, name: normalized, type: class
> scala.reflect.internal.Types$Type) - object (class
> scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class:
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1,
> type: class scala.reflect.api.Types$TypeApi) - object (class
> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class:
> org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type:
> interface scala.Function1) - object (class
> org.apache.spark.sql.catalyst.expressions.MapObjects,
> mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field
> (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) -
> field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name:
> targetObject, type: class
> org.apache.spark.sql.catalyst.expressions.Expression) - object (class
> org.apache.spark.sql.catalyst.expressions.Invoke,
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;))) - writeObject data (class:
> scala.collection.immutable.List$SerializationProxy) - object (class
> scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@7e78c3cf) -
> writeReplace data (class:
> scala.collection.immutable.List$SerializationProxy) - object (class
> scala.collection.immutable.$colon$colon,
> List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),
> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)))) - field (class:
> org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments,
> type: interface scala.collection.Seq) - object (class
> org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),true)) - writeObject data (class:
> scala.collection.immutable.List$SerializationProxy) - object (class
> scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@377795c5) -
> writeReplace data (class:
> scala.collection.immutable.List$SerializationProxy) - object (class
> scala.collection.immutable.$colon$colon, List(staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),true))) - field (class:
> org.apache.spark.sql.catalyst.expressions.NewInstance, name: arguments,
> type: interface scala.collection.Seq) - object (class
> org.apache.spark.sql.catalyst.expressions.NewInstance, newinstance(class
> collector.MyMap,staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
> [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None))
> - field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
> name: fromRowExpression, type: class
> org.apache.spark.sql.catalyst.expressions.Expression) - object (class
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
> class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map]) - field
> (class: org.apache.spark.sql.execution.MapPartitions, name: uEncoder, type:
> class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) - object
> (class org.apache.spark.sql.execution.MapPartitions, !MapPartitions ,
> class[a[0]: string, b[0]: string],
> class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map], [map#13] +-
> LocalTableScan [a#2,b#3],
> [[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,3261746164]]
> ) - field (class: org.apache.spark.sql.execution.MapPartitions$$anonfun$8,
> name: $outer, type: class org.apache.spark.sql.execution.MapPartitions) -
> object (class org.apache.spark.sql.execution.MapPartitions$$anonfun$8, ) -
> field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,
> name: f$22, type: interface scala.Function1) - object (class
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, ) - field
> (class:
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
> name: $outer, type: class
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - object (class
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
> ) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type:
> interface scala.Function3) - object (class
> org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at show at
> CollectorSparkTest.scala:50) - field (class:
> org.apache.spark.NarrowDependency, name: rdd, type: class
> org.apache.spark.rdd.RDD) - object (class
> org.apache.spark.OneToOneDependency,
> org.apache.spark.OneToOneDependency@110f15b7) - writeObject data (class:
> scala.collection.immutable.List$SerializationProxy) - object (class
> scala.collection.immutable.List$SerializationProxy,
> scala.collection.immutable.List$SerializationProxy@6bb23696) -
> writeReplace data (class:
> scala.collection.immutable.List$SerializationProxy) - object (class
> scala.collection.immutable.$colon$colon,
> List(org.apache.spark.OneToOneDependency@110f15b7)) - field (class:
> org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies,
> type: interface scala.collection.Seq) - object (class
> org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at show at
> CollectorSparkTest.scala:50) - field (class: scala.Tuple2, name: _1, type:
> class java.lang.Object) - object (class scala.Tuple2, (MapPartitionsRDD[2]
> at show at CollectorSparkTest.scala:50,)) at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
> at
> org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010)
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
> at
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> at
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at
> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at
> org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
> at
> org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
> at
> org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
> at
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
> at
> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
> at
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
> at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
> at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
> at 
> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
> at
> org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
> at
> org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
> at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at
> org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at
> org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at
> org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) at
> org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) at
> org.apache.spark.sql.Dataset.show(Dataset.scala:228) at
> org.apache.spark.sql.Dataset.show(Dataset.scala:192) at
> org.apache.spark.sql.Dataset.show(Dataset.scala:200)
>
> On Tue, Jan 12, 2016 at 10:39 AM, Michael Armbrust <mich...@databricks.com
> > wrote:
>
>> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
>>>
>>> This looks like a bug.  What is the error?  It might be fixed in
>> branch-1.6/master if you can test there.
>>
>>> Please advice on what I may be missing here?
>>>
>>>
>>> Also for join, may I suggest to have a custom encoder / transformation
>>> to say how 2 datasets can merge?
>>> Also, when a join in made using something like 'left outer join' the
>>> right side object should ideally be Option kind (similar to what's seen in
>>> RDD). And I think this may make it strongly typed?
>>>
>>
>> I think you can actually use as to convert this to an Option if you'd
>> like typesafety.  the problem is that at compile time we don't know if its
>> an inner or outer join.
>>
>>
>
>

Reply via email to