Thank you Takeshi, that was the issue.

Thanks
Ankur

Sent from my iPhone

> On Jan 25, 2017, at 9:08 PM, Takeshi Yamamuro <linguin....@gmail.com> wrote:
> 
> Hi,
> 
> Quickly looking around the attached, I found you wrongly passed the dataType
> of your aggregator output in line70.
> So, you need to at lease return `MapType` instead of `StructType`.
> The stacktrace you showed explicitly say this type unmatch.
> 
> // maropu
> 
> 
>> On Thu, Jan 26, 2017 at 12:07 PM, Ankur Srivastava 
>> <ankur.srivast...@gmail.com> wrote:
>> Hi,
>> 
>> I have a dataset with tuple of ID and Timestamp. I want to do a group by on 
>> ID and then create a map with frequency per hour for the ID.
>> 
>> Input:
>> 1| 20160106061005
>> 1| 20160106061515
>> 1| 20160106064010
>> 1| 20160106050402
>> 1| 20160106040101
>> 2| 20160106040101
>> 3| 20160106051451
>> 
>> Expected Output:
>> 1|{2016010604:1, 2016010605:1, 2016010606:3}
>> 2|{2016010604:1}
>> 3|{2016010605:1}
>> 
>> As I could not find a function in org.apache.spark.sql.functions library 
>> that can do this aggregation I wrote a UDAF but when I execute it, it throws 
>> below exception.
>> 
>> I am using Dataset API from Spark 2.0 and am using Java library. Also 
>> attached is the code with the test data.
>> 
>> scala.MatchError: {2016010606=1} (of class 
>> scala.collection.convert.Wrappers$MapWrapper)
>>      at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)
>>      at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
>>      at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
>>      at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
>>      at 
>> org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440)
>>      at 
>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228)
>>      at 
>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:220)
>>      at 
>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
>>      at 
>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
>>      at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
>>      at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>>      at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
>>      at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      at java.lang.Thread.run(Thread.java:745)
>> 17/01/25 18:20:58 INFO Executor: Executor is trying to kill task 29.0 in 
>> stage 7.0 (TID 398)
>> 17/01/25 18:20:58 INFO DAGScheduler: ResultStage 7 (show at 
>> EdgeAggregator.java:29) failed in 0.699 s
>> 17/01/25 18:20:58 INFO DAGScheduler: Job 3 failed: show at 
>> EdgeAggregator.java:29, took 0.712912 s
>> In merge hr: 2016010606
>> 17/01/25 18:20:58 WARN TaskSetManager: Lost task 29.0 in stage 7.0 (TID 398, 
>> localhost): scala.MatchError: {2016010606=1} (of class 
>> scala.collection.convert.Wrappers$MapWrapper)
>>      at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:256)
>>      at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:251)
>>      at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
>>      at 
>> org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:403)
>>      at 
>> org.apache.spark.sql.execution.aggregate.ScalaUDAF.eval(udaf.scala:440)
>>      at 
>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:228)
>>      at 
>> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:220)
>>      at 
>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
>>      at 
>> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
>>      at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:247)
>>      at 
>> org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
>>      at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
>>      at 
>> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:784)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
>>      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:85)
>>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      at java.lang.Thread.run(Thread.java:745)
>> 
>> 
>> Attached is the code that you can use to reproduce the error.
>> 
>> Thanks
>> Ankur
>> 
>> 
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 
> 
> -- 
> ---
> Takeshi Yamamuro

Reply via email to