[ 
https://issues.apache.org/jira/browse/SPARK-18711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15724115#comment-15724115
 ] 

koert kuipers commented on SPARK-18711:
---------------------------------------

confirmed it resolved the issue for me. thanks

> NPE in generated SpecificMutableProjection for Aggregator
> ---------------------------------------------------------
>
>                 Key: SPARK-18711
>                 URL: https://issues.apache.org/jira/browse/SPARK-18711
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: koert kuipers
>            Assignee: Wenchen Fan
>             Fix For: 2.1.0
>
>
> this is a bug in the branch-2.1, but i don't think it was in 2.1.0-rc1
> code (contrived, but based on real code we run):
> {noformat}
>   case class Holder(i: Int)
>   val agg1 = new Aggregator[Int, Tuple1[Option[Holder]], Seq[(String, Int, 
> Int)]] {
>     def zero: Tuple1[Option[Holder]] = {
>       val x = Tuple1(None)
>       println(s"zero ${x}")
>       x
>     }
>     def reduce(b: Tuple1[Option[Holder]], a: Int): Tuple1[Option[Holder]] = {
>       println(s"reduce ${b} ${a}")
>       Tuple1(Some(Holder(b._1.map(_.i + a).getOrElse(a))))
>     }
>     def merge(b1: Tuple1[Option[Holder]], b2: Tuple1[Option[Holder]]): 
> Tuple1[Option[Holder]] = {
>       println(s"merge ${b1} ${b2}")
>       (b1._1, b2._1) match {
>         case (Some(Holder(i1)), Some(Holder(i2))) => Tuple1(Some(Holder(i1 + 
> i2)))
>         case (Some(Holder(i1)), _) => Tuple1(Some(Holder(i1)))
>         case (_, Some(Holder(i2))) => Tuple1(Some(Holder(i2)))
>         case _ => Tuple1(None)
>       }
>     }
>     def finish(reduction: Tuple1[Option[Holder]]): Seq[(String, Int, Int)] = {
>       println(s"finish ${reduction}")
>       Seq(("ha", reduction._1.get.i, 0))
>     }
>     def bufferEncoder: Encoder[Tuple1[Option[Holder]]] = 
> ExpressionEncoder[Tuple1[Option[Holder]]]()
>     def outputEncoder: Encoder[Seq[(String, Int, Int)]] = 
> ExpressionEncoder[Seq[(String, Int, Int)]]()
>   }
>   val x = Seq(("a", 1), ("a", 2))
>     .toDS
>     .groupByKey(_._1)
>     .mapValues(_._2)
>     .agg(agg1.toColumn)
>   x.printSchema
>   x.show
> {noformat}
> result is:
> {noformat}
> org.apache.spark.executor.Executor: Exception in task 1.0 in stage 146.0 (TID 
> 423)
> java.lang.NullPointerException
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
>       at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221)
>       at 
> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
>       at 
> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:99)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> the error seems to be in the code generation for the aggregator result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to