[ 
https://issues.apache.org/jira/browse/SPARK-52023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Emil Ejbyfeldt updated SPARK-52023:
-----------------------------------
    Description: 
Using udaf with `Option` return type can cause segfaults and/or data corruption.

 

 
{code:java}
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder
final case class Reduce[T: Encoder](r: (T, T) => T) extends Aggregator[T, T, T]
{   def zero: T = null.asInstanceOf[T]   def reduce(b: T, a: T): T = if (b == 
null) a else r(b, a)   def merge(b1: T, b2: T): T = if (b1 == null) b2 else if 
(b2 == null) b1 else r(b1, b2)   def finish(reduction: T): T = reduction   def 
bufferEncoder: Encoder[T] = implicitly   def outputEncoder: Encoder[T] = 
implicitly }
type V = (String, Seq[String])
val agg = udaf(Reduce[Option[V]]((option1, option2) => 
option1.zip(option2).map(p => p._1)))
val d = Seq[(Int, Option[V])]((1, Some(("a", Seq("a", "b")))))
spark.createDataset(d).groupBy($"_1").agg(agg($"_2")).collect()
spark.createDataset(d).agg(agg($"_2")).collect()
```
produces output
```
val d: Seq[(Int, Option[V])] = List((1,Some((a,List(a, b)))))
val res14: Array[org.apache.spark.sql.Row] = Array([[????????????????0??? 
???a??????????????????????????? ???????(???a???????b???????,ArraySeq()]])
{code}




So the output looks like corrupt.  But I have also seen crashes and complaints 
about incorrect string sizes.

  was:
Using udaf with `Option` return type can cause segfaults and/or data corruption.

```

import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.Encoder

final case class Reduce[T: Encoder](r: (T, T) => T) extends Aggregator[T, T, T] 
{
  def zero: T = null.asInstanceOf[T]
  def reduce(b: T, a: T): T = if (b == null) a else r(b, a)
  def merge(b1: T, b2: T): T = if (b1 == null) b2 else if (b2 == null) b1 else 
r(b1, b2)
  def finish(reduction: T): T = reduction
  def bufferEncoder: Encoder[T] = implicitly
  def outputEncoder: Encoder[T] = implicitly
}

type V = (String, Seq[String])
val agg = udaf(Reduce[Option[V]]((option1, option2) => 
option1.zip(option2).map(p => p._1)))

val d = Seq[(Int, Option[V])]((1, Some(("a", Seq("a", "b")))))
spark.createDataset(d).groupBy($"_1").agg(agg($"_2")).collect()
spark.createDataset(d).agg(agg($"_2")).collect()
```
produces output
```
val d: Seq[(Int, Option[V])] = List((1,Some((a,List(a, b)))))
val res14: Array[org.apache.spark.sql.Row] = Array([[????????????????0??? 
???a??????????????????????????? ???????(???a???????b???????,ArraySeq()]])
```

So the output looks like corrupt.  But I have also seen crashes and complaints 
about incorrect string sizes.


> udaf returning Option can cause data corruption and crashes
> -----------------------------------------------------------
>
>                 Key: SPARK-52023
>                 URL: https://issues.apache.org/jira/browse/SPARK-52023
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.0, 3.5.5
>            Reporter: Emil Ejbyfeldt
>            Priority: Major
>
> Using udaf with `Option` return type can cause segfaults and/or data 
> corruption.
>  
>  
> {code:java}
> import org.apache.spark.sql.expressions.Aggregator
> import org.apache.spark.sql.Encoder
> final case class Reduce[T: Encoder](r: (T, T) => T) extends Aggregator[T, T, 
> T]
> {   def zero: T = null.asInstanceOf[T]   def reduce(b: T, a: T): T = if (b == 
> null) a else r(b, a)   def merge(b1: T, b2: T): T = if (b1 == null) b2 else 
> if (b2 == null) b1 else r(b1, b2)   def finish(reduction: T): T = reduction   
> def bufferEncoder: Encoder[T] = implicitly   def outputEncoder: Encoder[T] = 
> implicitly }
> type V = (String, Seq[String])
> val agg = udaf(Reduce[Option[V]]((option1, option2) => 
> option1.zip(option2).map(p => p._1)))
> val d = Seq[(Int, Option[V])]((1, Some(("a", Seq("a", "b")))))
> spark.createDataset(d).groupBy($"_1").agg(agg($"_2")).collect()
> spark.createDataset(d).agg(agg($"_2")).collect()
> ```
> produces output
> ```
> val d: Seq[(Int, Option[V])] = List((1,Some((a,List(a, b)))))
> val res14: Array[org.apache.spark.sql.Row] = Array([[????????????????0??? 
> ???a??????????????????????????? ???????(???a???????b???????,ArraySeq()]])
> {code}
> So the output looks like corrupt.  But I have also seen crashes and 
> complaints about incorrect string sizes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to