I even added a fake groupByKey on the entire DataSet:

scala> a_ds.groupByKey(k=>1).agg(typed.count[(Long,Long)](_._1)).show
+-----+------------------------+
|value|TypedCount(scala.Tuple2)|
+-----+------------------------+
|    1|                       2|
+-----+------------------------+




On Tue, Oct 18, 2016 at 11:30 PM, Yang <teddyyyy...@gmail.com> wrote:

> scala> val a = sc.parallelize(Array((1,2),(3,4)))
> a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[243] at
> parallelize at <console>:38
>
> scala> val a_ds = hc.di.createDataFrame(a).as[(Long,Long)]
> a_ds: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: int, _2: int]
>
> scala> a_ds.agg(typed.count[(Long,Long)](x=>x._1))
> res34: org.apache.spark.sql.DataFrame = [TypedCount(org.apache.spark.sql.Row):
> bigint]
>
> scala> res34.show
>
> then it gave me the following error:
>
> Caused by: java.lang.ClassCastException: org.apache.spark.sql.catalyst.
> expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
>
> at $anonfun$1.apply(<console>:46)at 
> org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:69)at
>  
> org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:66)at
>  
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
>  Source)at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at
>  
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at
>  scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)at
>  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at 
> org.apache.spark.scheduler.Task.run(Task.scala:86)at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at
>  
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at
>  java.lang.Thread.run(Thread.java:745)
>
>
> I had to add a groupByKey()
> scala> a_ds.groupByKey(k=>k._1).agg(typed.count[(Long,Long)](_._1)).show
> +-----+------------------------+
> |value|TypedCount(scala.Tuple2)|
> +-----+------------------------+
> |    1|                       1|
> |    3|                       1|
> +-----+------------------------+
>
> but why does the groupByKey() make it any different? looks like a bug
>
>
>

Reply via email to