scala> val a = sc.parallelize(Array((1,2),(3,4)))
a: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[243] at
parallelize at <console>:38

scala> val a_ds = hc.di.createDataFrame(a).as[(Long,Long)]
a_ds: org.apache.spark.sql.Dataset[(Long, Long)] = [_1: int, _2: int]

scala> a_ds.agg(typed.count[(Long,Long)](x=>x._1))
res34: org.apache.spark.sql.DataFrame =
[TypedCount(org.apache.spark.sql.Row): bigint]

scala> res34.show

then it gave me the following error:

Caused by: java.lang.ClassCastException:
org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be
cast to scala.Tuple2

at $anonfun$1.apply(<console>:46)at
org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:69)at
org.apache.spark.sql.execution.aggregate.TypedCount.reduce(typedaggregators.scala:66)at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at
org.apache.spark.scheduler.Task.run(Task.scala:86)at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at
java.lang.Thread.run(Thread.java:745)


I had to add a groupByKey()
scala> a_ds.groupByKey(k=>k._1).agg(typed.count[(Long,Long)](_._1)).show
+-----+------------------------+
|value|TypedCount(scala.Tuple2)|
+-----+------------------------+
|    1|                       1|
|    3|                       1|
+-----+------------------------+

but why does the groupByKey() make it any different? looks like a bug

Reply via email to