[ https://issues.apache.org/jira/browse/SPARK-15810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15720610#comment-15720610 ]
koert kuipers commented on SPARK-15810: --------------------------------------- here is an example where (None,) gets read back in as (null,) incorrectly: {noformat} def agg[A, B, C](prepare: A => B, plus: (B, B) => B, present: B => C)(implicit bEncoder: Encoder[Option[B]], cEncoder: Encoder[C]): Aggregator[A, Tuple1[Option[B]], C] = new Aggregator[A, Tuple1[Option[B]], C] { def zero: Tuple1[Option[B]] = { val x = Tuple1(None) println(s"zero ${x}") x } def reduce(maybeB: Tuple1[Option[B]], a: A): Tuple1[Option[B]] = { println(s"reduce ${maybeB} and ${a}") merge(maybeB, Tuple1(Option(prepare(a)))) } def merge(maybeB1: Tuple1[Option[B]], maybeB2: Tuple1[Option[B]]): Tuple1[Option[B]] = { println(s"merge ${maybeB1} and ${maybeB2}") Tuple1(maybeB1._1.map(b1 => maybeB2._1.map(b2 => plus(b1, b2)).getOrElse(b1)).orElse(maybeB2._1)) } def finish(maybeB: Tuple1[Option[B]]): C = maybeB._1.map(present).getOrElse(sys.error("need zero for empty data")) val bufferEncoder: Encoder[Tuple1[Option[B]]] = ExpressionEncoder.tuple(encoderFor(bEncoder)) val outputEncoder: Encoder[C] = cEncoder } val agg1 = agg( { x: Int => x }, { (y1: Int, y2: Int) => y1 + y2 }, { y: Int => y } )(ExpressionEncoder(), ExpressionEncoder()) val x = Seq(("a", 1), ("a", 2)) .toDS .groupByKey(_._1) .mapValues(_._2) .agg(agg1.toColumn) x.printSchema x.show {noformat} the result is: {noformat} root |-- value: string (nullable = true) |-- anon$1(int): integer (nullable = false) zero (None) zero (None) reduce (null) and 1 reduce (null) and 2 merge (null) and (Some(2)) merge (null) and (Some(1)) org.apache.spark.executor.Executor: Exception in task 1.0 in stage 143.0 (TID 403) java.lang.NullPointerException ... {noformat} the NPE is in the merge method because it comes in as (null,) while i expect a Tuple1[Option[Int]] and i try to map over the option. > Aggregator doesn't play nice with Option > ---------------------------------------- > > Key: SPARK-15810 > URL: https://issues.apache.org/jira/browse/SPARK-15810 > Project: Spark > Issue Type: Sub-task > Components: SQL > Environment: spark 2.0.0-SNAPSHOT > Reporter: koert kuipers > > {code} > val ds1 = List(("a", 1), ("a", 2), ("a", 3)).toDS > val ds2 = ds1.map{ case (k, v) => (k, if (v > 1) Some(v) else None) } > val ds3 = ds2.groupByKey(_._1).agg(new Aggregator[(String, Option[Int]), > Option[Int], Option[Int]]{ > def zero: Option[Int] = None > def reduce(b: Option[Int], a: (String, Option[Int])): Option[Int] = > b.map(bv => a._2.map(av => bv + av).getOrElse(bv)).orElse(a._2) > def merge(b1: Option[Int], b2: Option[Int]): Option[Int] = b1.map(b1v => > b2.map(b2v => b1v + b2v).getOrElse(b1v)).orElse(b2) > def finish(reduction: Option[Int]): Option[Int] = reduction > def bufferEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > def outputEncoder: Encoder[Option[Int]] = implicitly[Encoder[Option[Int]]] > }.toColumn) > ds3.printSchema > ds3.show > {code} > i get as output a somewhat odd looking schema, and after that the program > just hangs pinning one cpu at 100%. the data never shows. > output: > {noformat} > root > |-- value: string (nullable = true) > |-- $anon$1(scala.Tuple2): struct (nullable = true) > | |-- value: integer (nullable = true) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org