Emil Ejbyfeldt created SPARK-52023: -------------------------------------- Summary: udaf returning Option can cause data corruption and crashes Key: SPARK-52023 URL: https://issues.apache.org/jira/browse/SPARK-52023 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.5, 4.0.0 Reporter: Emil Ejbyfeldt
Using udaf with `Option` return type can cause segfaults and/or data corruption. ``` import org.apache.spark.sql.expressions.Aggregator import org.apache.spark.sql.Encoder final case class Reduce[T: Encoder](r: (T, T) => T) extends Aggregator[T, T, T] { def zero: T = null.asInstanceOf[T] def reduce(b: T, a: T): T = if (b == null) a else r(b, a) def merge(b1: T, b2: T): T = if (b1 == null) b2 else if (b2 == null) b1 else r(b1, b2) def finish(reduction: T): T = reduction def bufferEncoder: Encoder[T] = implicitly def outputEncoder: Encoder[T] = implicitly } type V = (String, Seq[String]) val agg = udaf(Reduce[Option[V]]((option1, option2) => option1.zip(option2).map(p => p._1))) val d = Seq[(Int, Option[V])]((1, Some(("a", Seq("a", "b"))))) spark.createDataset(d).groupBy($"_1").agg(agg($"_2")).collect() spark.createDataset(d).agg(agg($"_2")).collect() ``` produces output ``` val d: Seq[(Int, Option[V])] = List((1,Some((a,List(a, b))))) val res14: Array[org.apache.spark.sql.Row] = Array([[????????????????0??? ???a??????????????????????????? ???????(???a???????b???????,ArraySeq()]]) ``` So the output looks like corrupt. But I have also seen crashes and complaints about incorrect string sizes. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org