[ https://issues.apache.org/jira/browse/SPARK-34829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-34829: ------------------------------------ Assignee: Apache Spark > transform_values return identical values when it's used with udf that returns > reference type > -------------------------------------------------------------------------------------------- > > Key: SPARK-34829 > URL: https://issues.apache.org/jira/browse/SPARK-34829 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.1 > Reporter: Pavel Chernikov > Assignee: Apache Spark > Priority: Major > > If return value of an {{udf}} that is passed to {{transform_values}} is an > {{AnyRef}}, then the transformation returns identical new values for each map > key (to be more precise, each newly obtained value overrides values for all > previously processed keys). > Consider following examples: > {code:java} > case class Bar(i: Int) > val square = udf((b: Bar) => b.i * b.i) > val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") > df.withColumn("map_square", transform_values(col("map"), (_, v) => > square(v))).show(truncate = false) > +------------------------------+------------------------+ > |map |map_square | > +------------------------------+------------------------+ > |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> 1, 2 -> 4, 3 -> 9}| > +------------------------------+------------------------+ > {code} > vs > {code:java} > case class Bar(i: Int) > case class BarSquare(i: Int) > val square = udf((b: Bar) => BarSquare(b.i * b.i)) > val df = Seq(Map(1 -> Bar(1), 2 -> Bar(2), 3 -> Bar(3))).toDF("map") > df.withColumn("map_square", transform_values(col("map"), (_, v) => > square(v))).show(truncate = false) > +------------------------------+------------------------------+ > |map |map_square | > +------------------------------+------------------------------+ > |{1 -> {1}, 2 -> {2}, 3 -> {3}}|{1 -> {9}, 2 -> {9}, 3 -> {9}}| > +------------------------------+------------------------------+ > {code} > or even just this one > {code:java} > case class Foo(s: String) > val reverse = udf((f: Foo) => f.s.reverse) > val df = Seq(Map(1 -> Foo("abc"), 2 -> Foo("klm"), 3 -> > Foo("xyz"))).toDF("map") > df.withColumn("map_reverse", transform_values(col("map"), (_, v) => > reverse(v))).show(truncate = false) > +------------------------------------+------------------------------+ > |map |map_reverse | > +------------------------------------+------------------------------+ > |{1 -> {abc}, 2 -> {klm}, 3 -> {xyz}}|{1 -> zyx, 2 -> zyx, 3 -> zyx}| > +------------------------------------+------------------------------+ > {code} > After playing with > {{org.apache.spark.sql.catalyst.expressions.TransformValues}} it looks like > something wrong is happening while executing this line: > {code:java} > resultValues.update(i, functionForEval.eval(inputRow)){code} > To be more precise , it's all about {{functionForEval.eval(inputRow)}} , > because if you do something like this: > {code:java} > println(s"RESULTS PRIOR TO EVALUATION - $resultValues") > val resultValue = functionForEval.eval(inputRow) > println(s"RESULT - $resultValue") > println(s"RESULTS PRIOR TO UPDATE - $resultValues") > resultValues.update(i, resultValue) > println(s"RESULTS AFTER UPDATE - $resultValues"){code} > You'll see in the logs, something like: > {code:java} > RESULTS PRIOR TO EVALUATION - [null,null,null] > RESULT - [0,1] > RESULTS PRIOR TO UPDATE - [null,null,null] > RESULTS AFTER UPDATE - [[0,1],null,null] > ------ > RESULTS PRIOR TO EVALUATION - [[0,1],null,null] > RESULT - [0,4] > RESULTS PRIOR TO UPDATE - [[0,4],null,null] > RESULTS AFTER UPDATE - [[0,4],[0,4],null] > ------ > RESULTS PRIOR TO EVALUATION - [[0,4],[0,4],null] > RESULT - [0,9] > RESULTS PRIOR TO UPDATE - [[0,9],[0,9],null] > RESULTS AFTER UPDATE - [[0,9],[0,9],[0,9] > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org