[jira] [Commented] (SPARK-44323) Scala None shows up as null for Aggregator BUF or OUT
[ https://issues.apache.org/jira/browse/SPARK-44323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17741400#comment-17741400 ] koert kuipers commented on SPARK-44323: --- not sure why pullreq isnt getting linked automatically but its here: https://github.com/apache/spark/pull/41903 > Scala None shows up as null for Aggregator BUF or OUT > --- > > Key: SPARK-44323 > URL: https://issues.apache.org/jira/browse/SPARK-44323 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: koert kuipers >Priority: Major > > when doing an upgrade from spark 3.3.1 to spark 3.4.1 we suddenly started > getting null pointer exceptions in Aggregators (classes extending > org.apache.spark.sql.expressions.Aggregator) that use scala Option for BUF > and/or OUT. basically None is now showing up as null. > after adding a simple test case and doing a binary search on commits we > landed on SPARK-37829 being the cause. > we observed the issue at first with NPE inside Aggregator.merge because None > was null. i am having a hard time replicating that in a spark unit test, but > i did manage to get a None become null in the output. simple test that now > fails: > > {code:java} > diff --git > a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala > b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala > index e9daa825dd4..a1959d7065d 100644 > --- > a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala > +++ > b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala > @@ -228,6 +228,16 @@ case class FooAgg(s: Int) extends Aggregator[Row, Int, > Int] { > def outputEncoder: Encoder[Int] = Encoders.scalaInt > } > > +object OptionStringAgg extends Aggregator[Option[String], Option[String], > Option[String]] { > + override def zero: Option[String] = None > + override def reduce(b: Option[String], a: Option[String]): Option[String] > = merge(b, a) > + override def finish(reduction: Option[String]): Option[String] = reduction > + override def merge(b1: Option[String], b2: Option[String]): Option[String] > = > + b1.map{ b1v => b2.map{ b2v => b1v ++ b2v }.getOrElse(b1v) }.orElse(b2) > + override def bufferEncoder: Encoder[Option[String]] = ExpressionEncoder() > + override def outputEncoder: Encoder[Option[String]] = ExpressionEncoder() > +} > + > class DatasetAggregatorSuite extends QueryTest with SharedSparkSession { > import testImplicits._ > > @@ -432,4 +442,15 @@ class DatasetAggregatorSuite extends QueryTest with > SharedSparkSession { > val agg = df.select(mode(col("a"))).as[String] > checkDataset(agg, "3") > } > + > + test("typed aggregation: option string") { > + val ds = Seq((1, Some("a")), (1, None), (1, Some("c")), (2, None)).toDS() > + > + checkDataset( > + ds.groupByKey(_._1).mapValues(_._2).agg( > + OptionStringAgg.toColumn > + ), > + (1, Some("ac")), (2, None) > + ) > + } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44323) Scala None shows up as null for Aggregator BUF or OUT
[ https://issues.apache.org/jira/browse/SPARK-44323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17740744#comment-17740744 ] koert kuipers commented on SPARK-44323: --- i think the issue is that Nones inside Tuples now become null. so its the usage of nullSafe inside the childrenDeserializers for tuples introduced in https://github.com/apache/spark/pull/40755 > Scala None shows up as null for Aggregator BUF or OUT > --- > > Key: SPARK-44323 > URL: https://issues.apache.org/jira/browse/SPARK-44323 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.4.1 >Reporter: koert kuipers >Priority: Major > > when doing an upgrade from spark 3.3.1 to spark 3.4.1 we suddenly started > getting null pointer exceptions in Aggregators (classes extending > org.apache.spark.sql.expressions.Aggregator) that use scala Option for BUF > and/or OUT. basically None is now showing up as null. > after adding a simple test case and doing a binary search on commits we > landed on SPARK-37829 being the cause. > we observed the issue at first with NPE inside Aggregator.merge because None > was null. i am having a hard time replicating that in a spark unit test, but > i did manage to get a None become null in the output. simple test that now > fails: > > {code:java} > diff --git > a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala > b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala > index e9daa825dd4..a1959d7065d 100644 > --- > a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala > +++ > b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala > @@ -228,6 +228,16 @@ case class FooAgg(s: Int) extends Aggregator[Row, Int, > Int] { > def outputEncoder: Encoder[Int] = Encoders.scalaInt > } > > +object OptionStringAgg extends Aggregator[Option[String], Option[String], > Option[String]] { > + override def zero: Option[String] = None > + override def reduce(b: Option[String], a: Option[String]): Option[String] > = merge(b, a) > + override def finish(reduction: Option[String]): Option[String] = reduction > + override def merge(b1: Option[String], b2: Option[String]): Option[String] > = > + b1.map{ b1v => b2.map{ b2v => b1v ++ b2v }.getOrElse(b1v) }.orElse(b2) > + override def bufferEncoder: Encoder[Option[String]] = ExpressionEncoder() > + override def outputEncoder: Encoder[Option[String]] = ExpressionEncoder() > +} > + > class DatasetAggregatorSuite extends QueryTest with SharedSparkSession { > import testImplicits._ > > @@ -432,4 +442,15 @@ class DatasetAggregatorSuite extends QueryTest with > SharedSparkSession { > val agg = df.select(mode(col("a"))).as[String] > checkDataset(agg, "3") > } > + > + test("typed aggregation: option string") { > + val ds = Seq((1, Some("a")), (1, None), (1, Some("c")), (2, None)).toDS() > + > + checkDataset( > + ds.groupByKey(_._1).mapValues(_._2).agg( > + OptionStringAgg.toColumn > + ), > + (1, Some("ac")), (2, None) > + ) > + } > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org