[ 
https://issues.apache.org/jira/browse/SPARK-18711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

koert kuipers updated SPARK-18711:
----------------------------------
    Description: 
this is a bug in the branch-2.1, but i don't think it was in 2.1.0-rc1
code:
{noformat}
  case class Holder(i: Int)

  val agg1 = new Aggregator[Int, Tuple1[Option[Holder]], Seq[(String, Int, 
Int)]] {
    def zero: Tuple1[Option[Holder]] = {
      val x = Tuple1(None)
      println(s"zero ${x}")
      x
    }

    def reduce(b: Tuple1[Option[Holder]], a: Int): Tuple1[Option[Holder]] = {
      println(s"reduce ${b} ${a}")
      Tuple1(Some(Holder(b._1.map(_.i + a).getOrElse(a))))
    }

    def merge(b1: Tuple1[Option[Holder]], b2: Tuple1[Option[Holder]]): 
Tuple1[Option[Holder]] = {
      println(s"merge ${b1} ${b2}")
      (b1._1, b2._1) match {
        case (Some(Holder(i1)), Some(Holder(i2))) => Tuple1(Some(Holder(i1 + 
i2)))
        case (Some(Holder(i1)), _) => Tuple1(Some(Holder(i1)))
        case (_, Some(Holder(i2))) => Tuple1(Some(Holder(i2)))
        case _ => Tuple1(None)
      }
    }

    def finish(reduction: Tuple1[Option[Holder]]): Seq[(String, Int, Int)] = {
      println(s"finish ${reduction}")
      Seq(("ha", reduction._1.get.i, 0))
    }

    def bufferEncoder: Encoder[Tuple1[Option[Holder]]] = 
ExpressionEncoder[Tuple1[Option[Holder]]]()

    def outputEncoder: Encoder[Seq[(String, Int, Int)]] = 
ExpressionEncoder[Seq[(String, Int, Int)]]()
  }

  val x = Seq(("a", 1), ("a", 2))
    .toDS
    .groupByKey(_._1)
    .mapValues(_._2)
    .agg(agg1.toColumn)
  x.printSchema
  x.show
{noformat}

result is:
{noformat}
org.apache.spark.executor.Executor: Exception in task 1.0 in stage 146.0 (TID 
423)
java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
        at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221)
        at 
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
        at 
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

the error seems to be in the code generation for the aggregator result.

  was:
this is a bug in the branch-2.1, but i don't think it was in 2.1.0-rc1
code:
{noformat}
  val agg1 = new Aggregator[Int, Tuple1[Option[Holder]], Seq[(String, Int, 
Int)]] {
    def zero: Tuple1[Option[Holder]] = {
      val x = Tuple1(None)
      println(s"zero ${x}")
      x
    }

    def reduce(b: Tuple1[Option[Holder]], a: Int): Tuple1[Option[Holder]] = {
      println(s"reduce ${b} ${a}")
      Tuple1(Some(Holder(b._1.map(_.i + a).getOrElse(a))))
    }

    def merge(b1: Tuple1[Option[Holder]], b2: Tuple1[Option[Holder]]): 
Tuple1[Option[Holder]] = {
      println(s"merge ${b1} ${b2}")
      (b1._1, b2._1) match {
        case (Some(Holder(i1)), Some(Holder(i2))) => Tuple1(Some(Holder(i1 + 
i2)))
        case (Some(Holder(i1)), _) => Tuple1(Some(Holder(i1)))
        case (_, Some(Holder(i2))) => Tuple1(Some(Holder(i2)))
        case _ => Tuple1(None)
      }
    }

    def finish(reduction: Tuple1[Option[Holder]]): Seq[(String, Int, Int)] = {
      println(s"finish ${reduction}")
      Seq(("ha", reduction._1.get.i, 0))
    }

    def bufferEncoder: Encoder[Tuple1[Option[Holder]]] = 
ExpressionEncoder[Tuple1[Option[Holder]]]()

    def outputEncoder: Encoder[Seq[(String, Int, Int)]] = 
ExpressionEncoder[Seq[(String, Int, Int)]]()
  }

  val x = Seq(("a", 1), ("a", 2))
    .toDS
    .groupByKey(_._1)
    .mapValues(_._2)
    .agg(agg1.toColumn)
  x.printSchema
  x.show
{noformat}

result is:
{noformat}
org.apache.spark.executor.Executor: Exception in task 1.0 in stage 146.0 (TID 
423)
java.lang.NullPointerException
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
 Source)
        at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
        at 
org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221)
        at 
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
        at 
org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
{noformat}

the error seems to be in the code generation for the aggregator result.


> NPE in generated SpecificMutableProjection for Aggregator
> ---------------------------------------------------------
>
>                 Key: SPARK-18711
>                 URL: https://issues.apache.org/jira/browse/SPARK-18711
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0
>            Reporter: koert kuipers
>
> this is a bug in the branch-2.1, but i don't think it was in 2.1.0-rc1
> code:
> {noformat}
>   case class Holder(i: Int)
>   val agg1 = new Aggregator[Int, Tuple1[Option[Holder]], Seq[(String, Int, 
> Int)]] {
>     def zero: Tuple1[Option[Holder]] = {
>       val x = Tuple1(None)
>       println(s"zero ${x}")
>       x
>     }
>     def reduce(b: Tuple1[Option[Holder]], a: Int): Tuple1[Option[Holder]] = {
>       println(s"reduce ${b} ${a}")
>       Tuple1(Some(Holder(b._1.map(_.i + a).getOrElse(a))))
>     }
>     def merge(b1: Tuple1[Option[Holder]], b2: Tuple1[Option[Holder]]): 
> Tuple1[Option[Holder]] = {
>       println(s"merge ${b1} ${b2}")
>       (b1._1, b2._1) match {
>         case (Some(Holder(i1)), Some(Holder(i2))) => Tuple1(Some(Holder(i1 + 
> i2)))
>         case (Some(Holder(i1)), _) => Tuple1(Some(Holder(i1)))
>         case (_, Some(Holder(i2))) => Tuple1(Some(Holder(i2)))
>         case _ => Tuple1(None)
>       }
>     }
>     def finish(reduction: Tuple1[Option[Holder]]): Seq[(String, Int, Int)] = {
>       println(s"finish ${reduction}")
>       Seq(("ha", reduction._1.get.i, 0))
>     }
>     def bufferEncoder: Encoder[Tuple1[Option[Holder]]] = 
> ExpressionEncoder[Tuple1[Option[Holder]]]()
>     def outputEncoder: Encoder[Seq[(String, Int, Int)]] = 
> ExpressionEncoder[Seq[(String, Int, Int)]]()
>   }
>   val x = Seq(("a", 1), ("a", 2))
>     .toDS
>     .groupByKey(_._1)
>     .mapValues(_._2)
>     .agg(agg1.toColumn)
>   x.printSchema
>   x.show
> {noformat}
> result is:
> {noformat}
> org.apache.spark.executor.Executor: Exception in task 1.0 in stage 146.0 (TID 
> 423)
> java.lang.NullPointerException
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply_0$(Unknown
>  Source)
>       at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection.apply(Unknown
>  Source)
>       at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:223)
>       at 
> org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$generateResultProjection$1.apply(AggregationIterator.scala:221)
>       at 
> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:159)
>       at 
> org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>       at org.apache.spark.scheduler.Task.run(Task.scala:99)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> the error seems to be in the code generation for the aggregator result.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to