[ 
https://issues.apache.org/jira/browse/SPARK-44323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17740744#comment-17740744
 ] 

koert kuipers commented on SPARK-44323:
---------------------------------------

i think the issue is that Nones inside Tuples now become null.

so its the usage of nullSafe inside the childrenDeserializers for tuples 
introduced in https://github.com/apache/spark/pull/40755

> Scala None shows up as null for Aggregator BUF or OUT  
> -------------------------------------------------------
>
>                 Key: SPARK-44323
>                 URL: https://issues.apache.org/jira/browse/SPARK-44323
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.4.1
>            Reporter: koert kuipers
>            Priority: Major
>
> when doing an upgrade from spark 3.3.1 to spark 3.4.1 we suddenly started 
> getting null pointer exceptions in Aggregators (classes extending 
> org.apache.spark.sql.expressions.Aggregator) that use scala Option for BUF 
> and/or OUT. basically None is now showing up as null.
> after adding a simple test case and doing a binary search on commits we 
> landed on SPARK-37829 being the cause.
> we observed the issue at first with NPE inside Aggregator.merge because None 
> was null. i am having a hard time replicating that in a spark unit test, but 
> i did manage to get a None become null in the output. simple test that now 
> fails:
>  
> {code:java}
> diff --git 
> a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala 
> b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
> index e9daa825dd4..a1959d7065d 100644
> --- 
> a/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
> +++ 
> b/sql/core/src/test/scala/org/apache/spark/sql/DatasetAggregatorSuite.scala
> @@ -228,6 +228,16 @@ case class FooAgg(s: Int) extends Aggregator[Row, Int, 
> Int] {
>    def outputEncoder: Encoder[Int] = Encoders.scalaInt
>  }
>  
> +object OptionStringAgg extends Aggregator[Option[String], Option[String], 
> Option[String]] {
> +  override def zero: Option[String] = None
> +  override def reduce(b: Option[String], a: Option[String]): Option[String] 
> = merge(b, a)
> +  override def finish(reduction: Option[String]): Option[String] = reduction
> +  override def merge(b1: Option[String], b2: Option[String]): Option[String] 
> =
> +    b1.map{ b1v => b2.map{ b2v => b1v ++ b2v }.getOrElse(b1v) }.orElse(b2)
> +  override def bufferEncoder: Encoder[Option[String]] = ExpressionEncoder()
> +  override def outputEncoder: Encoder[Option[String]] = ExpressionEncoder()
> +}
> +
>  class DatasetAggregatorSuite extends QueryTest with SharedSparkSession {
>    import testImplicits._
>  
> @@ -432,4 +442,15 @@ class DatasetAggregatorSuite extends QueryTest with 
> SharedSparkSession {
>      val agg = df.select(mode(col("a"))).as[String]
>      checkDataset(agg, "3")
>    }
> +
> +  test("typed aggregation: option string") {
> +    val ds = Seq((1, Some("a")), (1, None), (1, Some("c")), (2, None)).toDS()
> +
> +    checkDataset(
> +      ds.groupByKey(_._1).mapValues(_._2).agg(
> +        OptionStringAgg.toColumn
> +      ),
> +      (1, Some("ac")), (2, None)
> +    )
> +  }
>  }
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to