[ 
https://issues.apache.org/jira/browse/SPARK-29503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-29503:
-----------------------------------

    Assignee: Jungtaek Lim

> MapObjects doesn't copy Unsafe data when nested under Safe data
> ---------------------------------------------------------------
>
>                 Key: SPARK-29503
>                 URL: https://issues.apache.org/jira/browse/SPARK-29503
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.1, 3.0.0
>            Reporter: Aaron Lewis
>            Assignee: Jungtaek Lim
>            Priority: Major
>              Labels: correctness
>
> In order for MapObjects to operate safely, it checks to see if the result of 
> the mapping function is an Unsafe type (UnsafeRow, UnsafeArrayData, 
> UnsafeMapData) and performs a copy before writing it into MapObjects' output 
> array. This is to protect against expressions which re-use the same native 
> memory buffer to represent its result across evaluations; if the copy wasn't 
> here, all results would be pointing to the same native buffer and would 
> represent the last result written to the buffer. However, MapObjects misses 
> this needed copy if the Unsafe data is nested below some safe structure, for 
> instance a GenericArrrayData whose elements are all UnsafeRows. In this 
> scenario, all elements of the GenericArrayData will be pointing to the same 
> native UnsafeRow buffer which will hold the last value written to it.
>  
> Right now, this bug seems to only occur when a `ProjectExec` goes down the 
> `execute` path, as opposed to WholeStageCodegen's `produce` and `consume` 
> path.
>  
> Example Reproduction Code:
> {code:scala}
> import org.apache.spark.sql.catalyst.expressions.objects.MapObjects
> import org.apache.spark.sql.catalyst.expressions.CreateArray
> import org.apache.spark.sql.catalyst.expressions.Expression
> import org.apache.spark.sql.functions.{array, struct}
> import org.apache.spark.sql.Column
> import org.apache.spark.sql.types.ArrayType
> // For the purpose of demonstration, we need to disable WholeStage codegen
> spark.conf.set("spark.sql.codegen.wholeStage", "false")
> val exampleDS = spark.sparkContext.parallelize(Seq(Seq(1, 2, 
> 3))).toDF("items")
> // Trivial example: Nest unsafe struct inside safe array
> // items: Seq[Int] => items.map{item => Seq(Struct(item))}
> val result = exampleDS.select(    
>     new Column(MapObjects(
>         {item: Expression => array(struct(new Column(item))).expr},
>         $"items".expr,
>         exampleDS.schema("items").dataType.asInstanceOf[ArrayType].elementType
>     )) as "items"
> )
> result.show(10, false)
> {code}
>  
> Actual Output:
> {code:java}
> +---------------------------------------------------------+
> |items                                                    |
> +---------------------------------------------------------+
> |[WrappedArray([3]), WrappedArray([3]), WrappedArray([3])]|
> +---------------------------------------------------------+
> {code}
>  
> Expected Output:
> {code:java}
> +---------------------------------------------------------+
> |items                                                    |
> +---------------------------------------------------------+
> |[WrappedArray([1]), WrappedArray([2]), WrappedArray([3])]|
> +---------------------------------------------------------+
> {code}
>  
> We've confirmed that the bug exists on version 2.1.1 as well as on master 
> (which I assume corresponds to version 3.0.0?)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to