Aaron Lewis created SPARK-29503:
-----------------------------------

             Summary: MapObjects doesn't copy Unsafe data when nested under 
Safe data
                 Key: SPARK-29503
                 URL: https://issues.apache.org/jira/browse/SPARK-29503
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.1.1, 3.0.0
            Reporter: Aaron Lewis


In order for MapObjects to operate safely, it checks to see if the result of 
the mapping function is an Unsafe type (UnsafeRow, UnsafeArrayData, 
UnsafeMapData) and performs a copy before writing it into MapObjects' output 
array. This is to protect against expressions which re-use the same native 
memory buffer to represent its result across evaluations; if the copy wasn't 
here, all results would be pointing to the same native buffer and would 
represent the last result written to the buffer. However, MapObjects misses 
this needed copy if the Unsafe data is nested below some safe structure, for 
instance a GenericArrrayData whose elements are all UnsafeRows. In this 
scenario, all elements of the GenericArrayData will be pointing to the same 
native UnsafeRow buffer which will hold the last value written to it.

 

Right now, this bug seems to only occur when a `ProjectExec` goes down the 
`execute` path, as opposed to WholeStageCodegen's `produce` and `consume` path.

 

Example Reproduction Code:
{code:scala}
import org.apache.spark.sql.catalyst.expressions.objects.MapObjects
import org.apache.spark.sql.catalyst.expressions.CreateArray
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.functions.{array, struct}
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.ArrayType

// For the purpose of demonstration, we need to disable WholeStage codegen
spark.conf.set("spark.sql.codegen.wholeStage", "false")

val exampleDS = spark.sparkContext.parallelize(Seq(Seq(1, 2, 3))).toDF("items")

// Trivial example: Nest unsafe struct inside safe array
// items: Seq[Int] => items.map{item => Seq(Struct(item))}
val result = exampleDS.select(    
    new Column(MapObjects(
        {item: Expression => array(struct(new Column(item))).expr},
        $"items".expr,
        exampleDS.schema("items").dataType.asInstanceOf[ArrayType].elementType
    )) as "items"
)

result.show(10, false)
{code}
 

Actual Output:
{code:java}
+---------------------------------------------------------+
|items                                                    |
+---------------------------------------------------------+
|[WrappedArray([3]), WrappedArray([3]), WrappedArray([3])]|
+---------------------------------------------------------+
{code}
 

Expected Output:
{code:java}
+---------------------------------------------------------+
|items                                                    |
+---------------------------------------------------------+
|[WrappedArray([1]), WrappedArray([2]), WrappedArray([3])]|
+---------------------------------------------------------+
{code}
 

We've confirmed that the bug exists on version 2.1.1 as well as on master 
(which I assume corresponds to version 3.0.0?)

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to