Hi,

I have a dataset with tuple of ID and Timestamp. I want to do a group by on
ID and then create a map with frequency per hour for the ID.

Input:
1| 20160106061005
1| 20160106061515
1| 20160106064010
1| 20160106050402
1| 20160106040101
2| 20160106040101
3| 20160106051451

Expected Output:
1|{2016010604:1, 2016010605:1, 2016010606:3}
2|{2016010604:1}
3|{2016010605:1}

As I could not find a function in org.apache.spark.sql.functions library
that can do this aggregation I wrote a UDAF but when I execute it, it
throws below exception.

I am using Dataset API from Spark 2.0 and am using Java library. Also
attached is the code with the test data.

scala.MatchError: {2016010606=1} (of class
scala.collection.convert.Wrappers$MapWrapper)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:220)
at
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
at
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
17/01/25 18:20:58 INFO Executor: Executor is trying to kill task 29.0 in
stage 7.0 (TID 398)
17/01/25 18:20:58 INFO DAGScheduler: ResultStage 7 (show at
EdgeAggregator.java:29) failed in 0.699 s
17/01/25 18:20:58 INFO DAGScheduler: Job 3 failed: show at
EdgeAggregator.java:29, took 0.712912 s
In merge hr: 2016010606
17/01/25 18:20:58 WARN TaskSetManager: Lost task 29.0 in stage 7.0 (TID
398, localhost): scala.MatchError: {2016010606=1} (of class
scala.collection.convert.Wrappers$MapWrapper)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
at
org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228)
at
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:220)
at
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
at
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Attached is the code that you can use to reproduce the error.

Thanks
Ankur

Attachment: EdgeAggregator.java
Description: Binary data

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to