[ 
https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15720610#comment-15720610
 ] 

koert kuipers commented on SPARK-15810:
---------------------------------------

here is an example where (None,) gets read back in as (null,) incorrectly:

{noformat}
  def agg[A, B, C](prepare: A => B, plus: (B, B) => B, present: B => 
C)(implicit bEncoder: Encoder[Option[B]], cEncoder: Encoder[C]): Aggregator[A, 
Tuple1[Option[B]], C] = 
    new Aggregator[A, Tuple1[Option[B]], C] {
      def zero: Tuple1[Option[B]] = {
        val x = Tuple1(None)
        println(s"zero ${x}")
        x
      }

      def reduce(maybeB: Tuple1[Option[B]], a: A): Tuple1[Option[B]] = {
        println(s"reduce ${maybeB} and ${a}")
        merge(maybeB, Tuple1(Option(prepare(a))))
      }

      def merge(maybeB1: Tuple1[Option[B]], maybeB2: Tuple1[Option[B]]): 
Tuple1[Option[B]] = {
        println(s"merge ${maybeB1} and ${maybeB2}")
        Tuple1(maybeB1._1.map(b1 => maybeB2._1.map(b2 => plus(b1, 
b2)).getOrElse(b1)).orElse(maybeB2._1))
      }

      def finish(maybeB: Tuple1[Option[B]]): C = 
maybeB._1.map(present).getOrElse(sys.error("need zero for empty data"))

      val bufferEncoder: Encoder[Tuple1[Option[B]]] = 
ExpressionEncoder.tuple(encoderFor(bEncoder))

      val outputEncoder: Encoder[C] = cEncoder
    }

  val agg1 = agg(
    { x: Int => x },
    { (y1: Int, y2: Int) => y1 + y2 },
    { y: Int => y }
  )(ExpressionEncoder(), ExpressionEncoder())

  val x = Seq(("a", 1), ("a", 2))
    .toDS
    .groupByKey(_._1)
    .mapValues(_._2)
    .agg(agg1.toColumn)
  x.printSchema
  x.show
{noformat}

the result is:
{noformat}
root
 |-- value: string (nullable = true)
 |-- anon$1(int): integer (nullable = false)

zero (None)
zero (None)
reduce (null) and 1
reduce (null) and 2
merge (null) and (Some(2))
merge (null) and (Some(1))
org.apache.spark.executor.Executor: Exception in task 1.0 in stage 143.0 (TID 
403)
java.lang.NullPointerException
...
{noformat}
the NPE is in the merge method because it comes in as (null,) while i expect a 
Tuple1[Option[Int]] and i try to map over the option.

> Aggregator doesn't play nice with Option
> ----------------------------------------
>
>                 Key: SPARK-15810
>                 URL: https://issues.apache.org/jira/browse/SPARK-15810
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>         Environment: spark 2.0.0-SNAPSHOT
>            Reporter: koert kuipers
>
> {code}
> val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS
> val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) }
> val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), 
> Option[Int], Option[Int]]{
>   def zero: Option[Int] = None
>   def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = 
> b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2)
>   def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => 
> b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2)
>   def finish(reduction: Option[Int]): Option[Int] = reduction
>   def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
>   def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]]
> }.toColumn)
> ds3.printSchema
> ds3.show
> {code}
> i get as output a somewhat odd looking schema, and after that the program 
> just hangs pinning one cpu at 100%. the data never shows.
> output:
> {noformat}
> root
>  |-- value: string (nullable = true)
>  |-- $anon$1(scala.Tuple2): struct (nullable = true)
>  |    |-- value: integer (nullable = true)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to