Hi, I have a dataset with tuple of ID and Timestamp. I want to do a group by on ID and then create a map with frequency per hour for the ID.
Input: 1| 20160106061005 1| 20160106061515 1| 20160106064010 1| 20160106050402 1| 20160106040101 2| 20160106040101 3| 20160106051451 Expected Output: 1|{2016010604:1, 2016010605:1, 2016010606:3} 2|{2016010604:1} 3|{2016010605:1} As I could not find a function in org.apache.spark.sql.functions library that can do this aggregation I wrote a UDAF but when I execute it, it throws below exception. I am using Dataset API from Spark 2.0 and am using Java library. Also attached is the code with the test data. scala.MatchError: {2016010606=1} (of class scala.collection.convert.Wrappers$MapWrapper) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403) at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:220) at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152) at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/01/25 18:20:58 INFO Executor: Executor is trying to kill task 29.0 in stage 7.0 (TID 398) 17/01/25 18:20:58 INFO DAGScheduler: ResultStage 7 (show at EdgeAggregator.java:29) failed in 0.699 s 17/01/25 18:20:58 INFO DAGScheduler: Job 3 failed: show at EdgeAggregator.java:29, took 0.712912 s In merge hr: 2016010606 17/01/25 18:20:58 WARN TaskSetManager: Lost task 29.0 in stage 7.0 (TID 398, localhost): scala.MatchError: {2016010606=1} (of class scala.collection.convert.Wrappers$MapWrapper) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403) at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228) at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:220) at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152) at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29) at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247) at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Attached is the code that you can use to reproduce the error. Thanks Ankur
EdgeAggregator.java
Description: Binary data
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org