Awesome, thanks for opening the JIRA!  We'll take a look.

On Tue, Jan 12, 2016 at 1:53 PM, Muthu Jayakumar <bablo...@gmail.com> wrote:

> I tried to rerun the same code with current snapshot version of 1.6 and
> 2.0 from
> https://repository.apache.org/content/repositories/snapshots/org/apache/spark/spark-core_2.11/
>
> But I still see an exception around the same line. Here is the exception
> below. Filed an issue against the same SPARK-12783
> <https://issues.apache.org/jira/browse/SPARK-12783>
>
> .13:49:07.388 [main] ERROR o.a.s.s.c.e.c.GenerateSafeProjection - failed
> to compile: org.codehaus.commons.compiler.CompileException: File
> 'generated.java', Line 140, Column 47: No applicable constructor/method
> found for actual parameters "scala.collection.Map"; candidates are:
> "collector.MyMap(scala.collection.immutable.Map)"
> /* 001 */
> /* 002 */ public java.lang.Object
> generate(org.apache.spark.sql.catalyst.expressions.Expression[] expr) {
> /* 003 */   return new SpecificSafeProjection(expr);
> /* 004 */ }
> /* 005 */
> /* 006 */ class SpecificSafeProjection extends
> org.apache.spark.sql.catalyst.expressions.codegen.BaseProjection {
> /* 007 */
> /* 008 */   private org.apache.spark.sql.catalyst.expressions.Expression[]
> expressions;
> /* 009 */   private org.apache.spark.sql.catalyst.expressions.MutableRow
> mutableRow;
> /* 010 */
> /* 011 */
> /* 012 */
> /* 013 */   public
> SpecificSafeProjection(org.apache.spark.sql.catalyst.expressions.Expression[]
> expr) {
> /* 014 */     expressions = expr;
> /* 015 */     mutableRow = new
> org.apache.spark.sql.catalyst.expressions.GenericMutableRow(1);
> /* 016 */
> /* 017 */   }
> /* 018 */
> /* 019 */   public java.lang.Object apply(java.lang.Object _i) {
> /* 020 */     InternalRow i = (InternalRow) _i;
> /* 021 */     /* newinstance(class collector.MyMap,staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None) */
> /* 022 */     /* staticinvoke(class
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
> scala.collection.Map),toScalaMap,invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)),invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)),true) */
> /* 023 */     /*
> invoke(mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)) */
> /* 024 */     /*
> mapobjects(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true)))
> */
> /* 025 */     /* invoke(input[0,
> MapType(StringType,StringType,true)],keyArray,ArrayType(StringType,true)) */
> /* 026 */     /* input[0, MapType(StringType,StringType,true)] */
> /* 027 */     boolean isNull10 = i.isNullAt(0);
> /* 028 */     MapData primitive11 = isNull10 ? null : (i.getMap(0));
> /* 029 */
> /* 030 */
> /* 031 */     boolean isNull8 = isNull10;
> /* 032 */     ArrayData primitive9 =
> /* 033 */     isNull8 ?
> /* 034 */     null : (ArrayData) primitive11.keyArray();
> /* 035 */     isNull8 = primitive9 == null;
> /* 036 */
> /* 037 */     boolean isNull6 = primitive9 == null;
> /* 038 */     ArrayData primitive7 = null;
> /* 039 */
> /* 040 */     if (!isNull6) {
> /* 041 */       java.lang.String[] convertedArray15 = null;
> /* 042 */       int dataLength14 = primitive9.numElements();
> /* 043 */       convertedArray15 = new java.lang.String[dataLength14];
> /* 044 */
> /* 045 */       int loopIndex16 = 0;
> /* 046 */       while (loopIndex16 < dataLength14) {
> /* 047 */         UTF8String MapObjects_loopValue4 =
> /* 048 */         (UTF8String)primitive9.getUTF8String(loopIndex16);
> /* 049 */         boolean MapObjects_loopIsNull5 = isNull8 ||
> MapObjects_loopValue4 == null;
> /* 050 */
> /* 051 */         /*
> invoke(lambdavariable(MapObjects_loopValue4,MapObjects_loopIsNull5,StringType),toString,ObjectType(class
> java.lang.String)) */
> /* 052 */         boolean isNull12 = MapObjects_loopIsNull5;
> /* 053 */         java.lang.String primitive13 =
> /* 054 */         isNull12 ?
> /* 055 */         null : (java.lang.String)
> MapObjects_loopValue4.toString();
> /* 056 */         isNull12 = primitive13 == null;
> /* 057 */         if (isNull12) {
> /* 058 */           convertedArray15[loopIndex16] = null;
> /* 059 */         } else {
> /* 060 */           convertedArray15[loopIndex16] = primitive13;
> /* 061 */         }
> /* 062 */
> /* 063 */         loopIndex16 += 1;
> /* 064 */       }
> /* 065 */
> /* 066 */       isNull6 = false;
> /* 067 */       primitive7 = new
> org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray15);
> /* 068 */     }
> /* 069 */
> /* 070 */
> /* 071 */     boolean isNull4 = isNull6;
> /* 072 */     java.lang.Object[] primitive5 =
> /* 073 */     isNull4 ?
> /* 074 */     null : (java.lang.Object[]) primitive7.array();
> /* 075 */     isNull4 = primitive5 == null;
> /* 076 */     /*
> invoke(mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))),array,ObjectType(class
> [Ljava.lang.Object;)) */
> /* 077 */     /*
> mapobjects(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
> java.lang.String)),invoke(input[0,
> MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true)))
> */
> /* 078 */     /* invoke(input[0,
> MapType(StringType,StringType,true)],valueArray,ArrayType(StringType,true))
> */
> /* 079 */     /* input[0, MapType(StringType,StringType,true)] */
> /* 080 */     boolean isNull23 = i.isNullAt(0);
> /* 081 */     MapData primitive24 = isNull23 ? null : (i.getMap(0));
> /* 082 */
> /* 083 */
> /* 084 */     boolean isNull21 = isNull23;
> /* 085 */     ArrayData primitive22 =
> /* 086 */     isNull21 ?
> /* 087 */     null : (ArrayData) primitive24.valueArray();
> /* 088 */     isNull21 = primitive22 == null;
> /* 089 */
> /* 090 */     boolean isNull19 = primitive22 == null;
> /* 091 */     ArrayData primitive20 = null;
> /* 092 */
> /* 093 */     if (!isNull19) {
> /* 094 */       java.lang.String[] convertedArray28 = null;
> /* 095 */       int dataLength27 = primitive22.numElements();
> /* 096 */       convertedArray28 = new java.lang.String[dataLength27];
> /* 097 */
> /* 098 */       int loopIndex29 = 0;
> /* 099 */       while (loopIndex29 < dataLength27) {
> /* 100 */         UTF8String MapObjects_loopValue6 =
> /* 101 */         (UTF8String)primitive22.getUTF8String(loopIndex29);
> /* 102 */         boolean MapObjects_loopIsNull7 = isNull21 ||
> MapObjects_loopValue6 == null;
> /* 103 */
> /* 104 */         /*
> invoke(lambdavariable(MapObjects_loopValue6,MapObjects_loopIsNull7,StringType),toString,ObjectType(class
> java.lang.String)) */
> /* 105 */         boolean isNull25 = MapObjects_loopIsNull7;
> /* 106 */         java.lang.String primitive26 =
> /* 107 */         isNull25 ?
> /* 108 */         null : (java.lang.String)
> MapObjects_loopValue6.toString();
> /* 109 */         isNull25 = primitive26 == null;
> /* 110 */         if (isNull25) {
> /* 111 */           convertedArray28[loopIndex29] = null;
> /* 112 */         } else {
> /* 113 */           convertedArray28[loopIndex29] = primitive26;
> /* 114 */         }
> /* 115 */
> /* 116 */         loopIndex29 += 1;
> /* 117 */       }
> /* 118 */
> /* 119 */       isNull19 = false;
> /* 120 */       primitive20 = new
> org.apache.spark.sql.catalyst.util.GenericArrayData(convertedArray28);
> /* 121 */     }
> /* 122 */
> /* 123 */
> /* 124 */     boolean isNull17 = isNull19;
> /* 125 */     java.lang.Object[] primitive18 =
> /* 126 */     isNull17 ?
> /* 127 */     null : (java.lang.Object[]) primitive20.array();
> /* 128 */     isNull17 = primitive18 == null;
> /* 129 */
> /* 130 */     boolean isNull2 = !!(isNull4 || isNull17);
> /* 131 */     scala.collection.Map primitive3 = null;
> /* 132 */
> /* 133 */     if (!(isNull4 || isNull17)) {
> /* 134 */       primitive3 =
> org.apache.spark.sql.catalyst.util.ArrayBasedMapData.toScalaMap(primitive5,
> primitive18);
> /* 135 */       isNull2 = primitive3 == null;
> /* 136 */     }
> /* 137 */
> /* 138 */
> /* 139 */
> /* 140 */     final collector.MyMap primitive1 = new
> collector.MyMap(primitive3);
> /* 141 */     final boolean isNull0 = false;
> /* 142 */     if (isNull0) {
> /* 143 */       mutableRow.setNullAt(0);
> /* 144 */     } else {
> /* 145 */
> /* 146 */       mutableRow.update(0, primitive1);
> /* 147 */     }
> /* 148 */
> /* 149 */     return mutableRow;
> /* 150 */   }
> /* 151 */ }
> /* 152 */
>
> Thanks.
>
>
>
> On Tue, Jan 12, 2016 at 11:35 AM, Muthu Jayakumar <bablo...@gmail.com>
> wrote:
>
>> Thanks Micheal. Let me test it with a recent master code branch.
>>
>> Also for every mapping step should I have to create a new case class? I
>> cannot use Tuple as I have ~130 columns to process. Earlier I had used a
>> Seq[Any] (actually Array[Any] to optimize on serialization) but processed
>> it using RDD (by building the Schema at runtime). Now I am attempting to
>> replace this using Dataset.
>>
>> >the problem is that at compile time we don't know if its an inner or
>> outer join.
>> May I suggest to have different methods for different kind of joins
>> (similar to RDD api)? This way the typesafety is enforced.
>>
>> Here is the error message.
>>
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due to stage failure: Task not serializable:
>> java.io.NotSerializableException:
>> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1
>> Serialization stack: - object not serializable (class:
>> scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$1,
>> value: package lang) - field (class: scala.reflect.internal.Types$ThisType,
>> name: sym, type: class scala.reflect.internal.Symbols$Symbol) - object
>> (class scala.reflect.internal.Types$UniqueThisType, java.lang.type) - field
>> (class: scala.reflect.internal.Types$TypeRef, name: pre, type: class
>> scala.reflect.internal.Types$Type) - object (class
>> scala.reflect.internal.Types$ClassNoArgsTypeRef, String) - field (class:
>> scala.reflect.internal.Types$TypeRef, name: normalized, type: class
>> scala.reflect.internal.Types$Type) - object (class
>> scala.reflect.internal.Types$AliasNoArgsTypeRef, String) - field (class:
>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, name: keyType$1,
>> type: class scala.reflect.api.Types$TypeApi) - object (class
>> org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$6, ) - field (class:
>> org.apache.spark.sql.catalyst.expressions.MapObjects, name: function, type:
>> interface scala.Function1) - object (class
>> org.apache.spark.sql.catalyst.expressions.MapObjects,
>> mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),- field
>> (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType)) -
>> field (class: org.apache.spark.sql.catalyst.expressions.Invoke, name:
>> targetObject, type: class
>> org.apache.spark.sql.catalyst.expressions.Expression) - object (class
>> org.apache.spark.sql.catalyst.expressions.Invoke,
>> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>> [Ljava.lang.Object;))) - writeObject data (class:
>> scala.collection.immutable.List$SerializationProxy) - object (class
>> scala.collection.immutable.List$SerializationProxy,
>> scala.collection.immutable.List$SerializationProxy@7e78c3cf) -
>> writeReplace data (class:
>> scala.collection.immutable.List$SerializationProxy) - object (class
>> scala.collection.immutable.$colon$colon,
>> List(invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>> [Ljava.lang.Object;)),
>> invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>> [Ljava.lang.Object;)))) - field (class:
>> org.apache.spark.sql.catalyst.expressions.StaticInvoke, name: arguments,
>> type: interface scala.collection.Seq) - object (class
>> org.apache.spark.sql.catalyst.expressions.StaticInvoke, staticinvoke(class
>> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
>> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>> [Ljava.lang.Object;)),true)) - writeObject data (class:
>> scala.collection.immutable.List$SerializationProxy) - object (class
>> scala.collection.immutable.List$SerializationProxy,
>> scala.collection.immutable.List$SerializationProxy@377795c5) -
>> writeReplace data (class:
>> scala.collection.immutable.List$SerializationProxy) - object (class
>> scala.collection.immutable.$colon$colon, List(staticinvoke(class
>> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
>> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>> [Ljava.lang.Object;)),true))) - field (class:
>> org.apache.spark.sql.catalyst.expressions.NewInstance, name: arguments,
>> type: interface scala.collection.Seq) - object (class
>> org.apache.spark.sql.catalyst.expressions.NewInstance, newinstance(class
>> collector.MyMap,staticinvoke(class
>> org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,ObjectType(interface
>> scala.collection.Map),toScalaMap,invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),keyArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>> [Ljava.lang.Object;)),invoke(mapobjects(,invoke(upcast('map,MapType(StringType,StringType,true),-
>> field (class: "scala.collection.immutable.Map", name: "map"),- root class:
>> "collector.MyMap"),valueArray,ArrayType(StringType,true)),StringType),array,ObjectType(class
>> [Ljava.lang.Object;)),true),false,ObjectType(class collector.MyMap),None))
>> - field (class: org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
>> name: fromRowExpression, type: class
>> org.apache.spark.sql.catalyst.expressions.Expression) - object (class
>> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder,
>> class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map]) - field
>> (class: org.apache.spark.sql.execution.MapPartitions, name: uEncoder, type:
>> class org.apache.spark.sql.catalyst.encoders.ExpressionEncoder) - object
>> (class org.apache.spark.sql.execution.MapPartitions, !MapPartitions ,
>> class[a[0]: string, b[0]: string],
>> class[map#ExprId(9,255a02aa-f2fa-482d-8cd1-63e2d4d08b30): map], [map#13] +-
>> LocalTableScan [a#2,b#3],
>> [[0,180000000a,2800000005,2d35302d35313032,3130,3161746164],[0,180000000a,2800000005,2d35302d35313032,3130,3261746164]]
>> ) - field (class: org.apache.spark.sql.execution.MapPartitions$$anonfun$8,
>> name: $outer, type: class org.apache.spark.sql.execution.MapPartitions) -
>> object (class org.apache.spark.sql.execution.MapPartitions$$anonfun$8, ) -
>> field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,
>> name: f$22, type: interface scala.Function1) - object (class
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1, ) - field
>> (class:
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
>> name: $outer, type: class
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1) - object (class
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21,
>> ) - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type:
>> interface scala.Function3) - object (class
>> org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[1] at show at
>> CollectorSparkTest.scala:50) - field (class:
>> org.apache.spark.NarrowDependency, name: rdd, type: class
>> org.apache.spark.rdd.RDD) - object (class
>> org.apache.spark.OneToOneDependency,
>> org.apache.spark.OneToOneDependency@110f15b7) - writeObject data (class:
>> scala.collection.immutable.List$SerializationProxy) - object (class
>> scala.collection.immutable.List$SerializationProxy,
>> scala.collection.immutable.List$SerializationProxy@6bb23696) -
>> writeReplace data (class:
>> scala.collection.immutable.List$SerializationProxy) - object (class
>> scala.collection.immutable.$colon$colon,
>> List(org.apache.spark.OneToOneDependency@110f15b7)) - field (class:
>> org.apache.spark.rdd.RDD, name: org$apache$spark$rdd$RDD$$dependencies,
>> type: interface scala.collection.Seq) - object (class
>> org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[2] at show at
>> CollectorSparkTest.scala:50) - field (class: scala.Tuple2, name: _1, type:
>> class java.lang.Object) - object (class scala.Tuple2, (MapPartitionsRDD[2]
>> at show at CollectorSparkTest.scala:50,)) at
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
>> at
>> org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1010)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
>> at
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:861)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1607)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
>> at
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at
>> org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at
>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at
>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at
>> org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at
>> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
>> at
>> org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
>> at
>> org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
>> at
>> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
>> at
>> org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538)
>> at
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
>> at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125)
>> at 
>> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537)
>> at 
>> org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544)
>> at
>> org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414)
>> at
>> org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413)
>> at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at
>> org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at
>> org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at
>> org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) at
>> org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) at
>> org.apache.spark.sql.Dataset.show(Dataset.scala:228) at
>> org.apache.spark.sql.Dataset.show(Dataset.scala:192) at
>> org.apache.spark.sql.Dataset.show(Dataset.scala:200)
>>
>> On Tue, Jan 12, 2016 at 10:39 AM, Michael Armbrust <
>> mich...@databricks.com> wrote:
>>
>>> df1.as[TestCaseClass].map(_.toMyMap).show() //fails
>>>>
>>>> This looks like a bug.  What is the error?  It might be fixed in
>>> branch-1.6/master if you can test there.
>>>
>>>> Please advice on what I may be missing here?
>>>>
>>>>
>>>> Also for join, may I suggest to have a custom encoder / transformation
>>>> to say how 2 datasets can merge?
>>>> Also, when a join in made using something like 'left outer join' the
>>>> right side object should ideally be Option kind (similar to what's seen in
>>>> RDD). And I think this may make it strongly typed?
>>>>
>>>
>>> I think you can actually use as to convert this to an Option if you'd
>>> like typesafety.  the problem is that at compile time we don't know if its
>>> an inner or outer join.
>>>
>>>
>>
>>
>

Reply via email to