[ https://issues.apache.org/jira/browse/SPARK-29503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan reassigned SPARK-29503: ----------------------------------- Assignee: Jungtaek Lim > MapObjects doesn't copy Unsafe data when nested under Safe data > --------------------------------------------------------------- > > Key: SPARK-29503 > URL: https://issues.apache.org/jira/browse/SPARK-29503 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.1.1, 3.0.0 > Reporter: Aaron Lewis > Assignee: Jungtaek Lim > Priority: Major > Labels: correctness > > In order for MapObjects to operate safely, it checks to see if the result of > the mapping function is an Unsafe type (UnsafeRow, UnsafeArrayData, > UnsafeMapData) and performs a copy before writing it into MapObjects' output > array. This is to protect against expressions which re-use the same native > memory buffer to represent its result across evaluations; if the copy wasn't > here, all results would be pointing to the same native buffer and would > represent the last result written to the buffer. However, MapObjects misses > this needed copy if the Unsafe data is nested below some safe structure, for > instance a GenericArrrayData whose elements are all UnsafeRows. In this > scenario, all elements of the GenericArrayData will be pointing to the same > native UnsafeRow buffer which will hold the last value written to it. > > Right now, this bug seems to only occur when a `ProjectExec` goes down the > `execute` path, as opposed to WholeStageCodegen's `produce` and `consume` > path. > > Example Reproduction Code: > {code:scala} > import org.apache.spark.sql.catalyst.expressions.objects.MapObjects > import org.apache.spark.sql.catalyst.expressions.CreateArray > import org.apache.spark.sql.catalyst.expressions.Expression > import org.apache.spark.sql.functions.{array, struct} > import org.apache.spark.sql.Column > import org.apache.spark.sql.types.ArrayType > // For the purpose of demonstration, we need to disable WholeStage codegen > spark.conf.set("spark.sql.codegen.wholeStage", "false") > val exampleDS = spark.sparkContext.parallelize(Seq(Seq(1, 2, > 3))).toDF("items") > // Trivial example: Nest unsafe struct inside safe array > // items: Seq[Int] => items.map{item => Seq(Struct(item))} > val result = exampleDS.select( > new Column(MapObjects( > {item: Expression => array(struct(new Column(item))).expr}, > $"items".expr, > exampleDS.schema("items").dataType.asInstanceOf[ArrayType].elementType > )) as "items" > ) > result.show(10, false) > {code} > > Actual Output: > {code:java} > +---------------------------------------------------------+ > |items | > +---------------------------------------------------------+ > |[WrappedArray([3]), WrappedArray([3]), WrappedArray([3])]| > +---------------------------------------------------------+ > {code} > > Expected Output: > {code:java} > +---------------------------------------------------------+ > |items | > +---------------------------------------------------------+ > |[WrappedArray([1]), WrappedArray([2]), WrappedArray([3])]| > +---------------------------------------------------------+ > {code} > > We've confirmed that the bug exists on version 2.1.1 as well as on master > (which I assume corresponds to version 3.0.0?) > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org