Emil Ejbyfeldt created SPARK-52023:
--------------------------------------

             Summary: udaf returning Option can cause data corruption and 
crashes
                 Key: SPARK-52023
                 URL: https://issues.apache.org/jira/browse/SPARK-52023
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.5.5, 4.0.0
            Reporter: Emil Ejbyfeldt


Using udaf with `Option` return type can cause segfaults and/or data corruption.

```

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder

final case class Reduce[T: Encoder](r: (T, T) => T) extends Aggregator[T, T, T] 
{
  def zero: T = null.asInstanceOf[T]
  def reduce(b: T, a: T): T = if (b == null) a else r(b, a)
  def merge(b1: T, b2: T): T = if (b1 == null) b2 else if (b2 == null) b1 else 
r(b1, b2)
  def finish(reduction: T): T = reduction
  def bufferEncoder: Encoder[T] = implicitly
  def outputEncoder: Encoder[T] = implicitly
}

type V = (String, Seq[String])
val agg = udaf(Reduce[Option[V]]((option1, option2) => 
option1.zip(option2).map(p => p._1)))

val d = Seq[(Int, Option[V])]((1, Some(("a", Seq("a", "b")))))
spark.createDataset(d).groupBy($"_1").agg(agg($"_2")).collect()
spark.createDataset(d).agg(agg($"_2")).collect()
```
produces output
```
val d: Seq[(Int, Option[V])] = List((1,Some((a,List(a, b)))))
val res14: Array[org.apache.spark.sql.Row] = Array([[????????????????0??? 
???a??????????????????????????? ???????(???a???????b???????,ArraySeq()]])
```

So the output looks like corrupt.  But I have also seen crashes and complaints 
about incorrect string sizes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to