Thank you Takeshi, that was the issue. Thanks Ankur
Sent from my iPhone > On Jan 25, 2017, at 9:08 PM, Takeshi Yamamuro <linguin....@gmail.com> wrote: > > Hi, > > Quickly looking around the attached, I found you wrongly passed the dataType > of your aggregator output in line70. > So, you need to at lease return `MapType` instead of `StructType`. > The stacktrace you showed explicitly say this type unmatch. > > // maropu > > >> On Thu, Jan 26, 2017 at 12:07 PM, Ankur Srivastava >> <ankur.srivast...@gmail.com> wrote: >> Hi, >> >> I have a dataset with tuple of ID and Timestamp. I want to do a group by on >> ID and then create a map with frequency per hour for the ID. >> >> Input: >> 1| 20160106061005 >> 1| 20160106061515 >> 1| 20160106064010 >> 1| 20160106050402 >> 1| 20160106040101 >> 2| 20160106040101 >> 3| 20160106051451 >> >> Expected Output: >> 1|{2016010604:1, 2016010605:1, 2016010606:3} >> 2|{2016010604:1} >> 3|{2016010605:1} >> >> As I could not find a function in org.apache.spark.sql.functions library >> that can do this aggregation I wrote a UDAF but when I execute it, it throws >> below exception. >> >> I am using Dataset API from Spark 2.0 and am using Java library. Also >> attached is the code with the test data. >> >> scala.MatchError: {2016010606=1} (of class >> scala.collection.convert.Wrappers$MapWrapper) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403) >> at >> org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440) >> at >> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228) >> at >> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:220) >> at >> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152) >> at >> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) >> at org.apache.spark.scheduler.Task.run(Task.scala:85) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> 17/01/25 18:20:58 INFO Executor: Executor is trying to kill task 29.0 in >> stage 7.0 (TID 398) >> 17/01/25 18:20:58 INFO DAGScheduler: ResultStage 7 (show at >> EdgeAggregator.java:29) failed in 0.699 s >> 17/01/25 18:20:58 INFO DAGScheduler: Job 3 failed: show at >> EdgeAggregator.java:29, took 0.712912 s >> In merge hr: 2016010606 >> 17/01/25 18:20:58 WARN TaskSetManager: Lost task 29.0 in stage 7.0 (TID 398, >> localhost): scala.MatchError: {2016010606=1} (of class >> scala.collection.convert.Wrappers$MapWrapper) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) >> at >> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403) >> at >> org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440) >> at >> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228) >> at >> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:220) >> at >> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152) >> at >> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247) >> at >> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) >> at >> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) >> at org.apache.spark.scheduler.Task.run(Task.scala:85) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> >> Attached is the code that you can use to reproduce the error. >> >> Thanks >> Ankur >> >> >> --------------------------------------------------------------------- >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org > > > > -- > --- > Takeshi Yamamuro