[ https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
David Benedeki updated SPARK-35371: ----------------------------------- Description: When using an UDF returning string or complex type (Struct) on array members the resulting array consists of the last array member UDF result. h3. *Example code:* {color:#0033b3}import {color}org.apache.spark.sql.{{color:#000000}Column, SparkSession} import org.apache.spark.sql.functions{color}.\{callUDF, col, transform, udf} {color:#0033b3}val {color}{color:#000000}sparkBuilder{color}: {color:#000000}SparkSession{color}.{color:#000000}Builder {color}= {color:#000000}SparkSession{color}.builder() .master({color:#067d17}"local[*]"{color}) .appName({color:#067d17}s"Udf Bug Demo"{color}) .config({color:#067d17}"spark.ui.enabled"{color}, {color:#067d17}"false"{color}) .config({color:#067d17}"spark.debug.maxToStringFields"{color}, {color:#1750eb}100{color}) {color:#0033b3}val {color}{color:#000000}spark{color}: {color:#000000}SparkSession {color}= {color:#000000}sparkBuilder{color} .config({color:#067d17}"spark.driver.bindAddress"{color}, {color:#067d17}"127.0.0.1"{color}) .config({color:#067d17}"spark.driver.host"{color}, {color:#067d17}"127.0.0.1"{color}) .getOrCreate() {color:#0033b3}import {color}{color:#000000}spark{color}.{color:#000000}implicits{color}._ {color:#0033b3}case class {color}{color:#000000}Foo{color}(num: Int, s: {color:#007e8a}String{color}) {color:#0033b3}val {color}{color:#000000}src {color}= {color:#871094}Seq{color}( ({color:#1750eb}1{color}, {color:#1750eb}2{color}, Array({color:#1750eb}1{color}, {color:#1750eb}2{color}, {color:#1750eb}3{color})), ({color:#1750eb}2{color}, {color:#1750eb}2{color}, Array({color:#1750eb}2{color}, {color:#1750eb}2{color}, {color:#1750eb}2{color})), ({color:#1750eb}3{color}, {color:#1750eb}4{color}, Array({color:#1750eb}3{color}, {color:#1750eb}4{color}, {color:#1750eb}3{color}, {color:#1750eb}4{color})) ).toDF({color:#067d17}"A"{color}, {color:#067d17}"B"{color}, {color:#067d17}"C"{color}) {color:#0033b3}val {color}{color:#000000}udfStringName {color}= {color:#067d17}"UdfString"{color}{color:#0033b3}val {color}{color:#000000}udfIntName {color}= {color:#067d17}"UdfInt"{color}{color:#0033b3}val {color}{color:#000000}udfStructName {color}= {color:#067d17}"UdfStruct"{color}{color:#0033b3}val {color}{color:#000000}udfString {color}= udf((num: Int) => { (num + {color:#1750eb}1{color}).toString }) {color:#000000}spark{color}.udf.register({color:#000000}udfStringName{color}, {color:#000000}udfString{color}) {color:#0033b3}val {color}{color:#000000}udfInt {color}= udf((num: Int) => { num + {color:#1750eb}1 }) spark{color}.udf.register({color:#000000}udfIntName{color}, {color:#000000}udfInt{color}) {color:#0033b3}val {color}{color:#000000}udfStruct {color}= udf((num: Int) => { Foo(num + {color:#1750eb}1{color}, (num + {color:#1750eb}1{color}).toString) }) {color:#000000}spark{color}.udf.register({color:#000000}udfStructName{color}, {color:#000000}udfStruct{color}) {color:#0033b3}val {color}{color:#000000}lambdaString {color}= (forCol: {color:#000000}Column{color}) => callUDF({color:#000000}udfStringName{color}, forCol) {color:#0033b3}val {color}{color:#000000}lambdaInt {color}= (forCol: {color:#000000}Column{color}) => callUDF({color:#000000}udfIntName{color}, forCol) {color:#0033b3}val {color}{color:#000000}lambdaStruct {color}= (forCol: {color:#000000}Column{color}) => callUDF({color:#000000}udfStructName{color}, forCol) {color:#0033b3}val {color}{color:#000000}cA {color}= callUDF({color:#000000}udfStringName{color}, col({color:#067d17}"A"{color})) {color:#0033b3}val {color}{color:#000000}cB {color}= callUDF({color:#000000}udfStringName{color}, col({color:#067d17}"B"{color})) {color:#0033b3}val {color}{color:#000000}cCString{color}: {color:#000000}Column {color}= transform(col({color:#067d17}"C"{color}), {color:#000000}lambdaString{color}) {color:#0033b3}val {color}{color:#000000}cCInt{color}: {color:#000000}Column {color}= transform(col({color:#067d17}"C"{color}), {color:#000000}lambdaInt{color}) {color:#0033b3}val {color}{color:#000000}cCStruc{color}: {color:#000000}Column {color}= transform(col({color:#067d17}"C"{color}), {color:#000000}lambdaStruct{color}) {color:#0033b3}val {color}{color:#000000}dest {color}= {color:#000000}src{color}.withColumn({color:#067d17}"AStr"{color}, {color:#000000}cA{color}) .withColumn({color:#067d17}"BStr"{color}, {color:#000000}cB{color}) .withColumn({color:#067d17}"CString (Wrong)"{color}, {color:#000000}cCString{color}) .withColumn({color:#067d17}"CInt (OK)"{color}, {color:#000000}cCInt{color}) .withColumn({color:#067d17}"CStruct (Wrong)"{color}, {color:#000000}cCStruc{color}) {color:#000000}dest{color}.show({color:#0033b3}false{color}) {color:#000000}dest{color}.printSchema() h3. *Expected:* {noformat} +---+---+------------+----+----+---------------+------------+--------------------------------+ |A |B |C |AStr|BStr|CString |CInt |CStruct | +---+---+------------+----+----+---------------+------------+--------------------------------+ |1 |2 |[1, 2, 3] |2 |3 |[2, 3, 4] |[2, 3, 4] |[{2, 2}, {3, 3}, {4, 4}] | |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, {3, 3}] | |3 |4 |[3, 4, 3, 4]|4 |5 |[4, 5, 4, 5] |[4, 5, 4, 5]|[{4, 4}, {5, 5}, {4, 4}, {5, 5}]| +---+---+------------+----+----+---------------+------------+--------------------------------+ {noformat} h3. *Got:* {noformat} +---+---+------------+----+----+---------------+------------+--------------------------------+ |A |B |C |AStr|BStr|CString (Wrong)|CInt (Ok) |CStruct (Wrong) | +---+---+------------+----+----+---------------+------------+--------------------------------+ |1 |2 |[1, 2, 3] |2 |3 |[4, 4, 4] |[2, 3, 4] |[{4, 4}, {4, 4}, {4, 4}] | |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, {3, 3}] | |3 |4 |[3, 4, 3, 4]|4 |5 |[5, 5, 5, 5] |[4, 5, 4, 5]|[{5, 5}, {5, 5}, {5, 5}, {5, 5}]| +---+---+------------+----+----+---------------+------------+--------------------------------+ {noformat} h3. *Observation* * Work correctly on Spark 3.0.2 * When UDF is registered as Java UDF, it works as supposed * The UDF is called the appropriate number of times (regardless if UDF is marked as deterministic or non-deterministic). * When debugged, the correct value is actually saved into the result array at first but every subsequent item processing overwrites the previous result values as well. Therefore the last item values filling the array is the final result. * When the UDF returns NULL/None it does not "overwrite” the prior array values nor is “overwritten” by subsequent non-NULL values. See with following UDF impelementation: {color:#0033b3}val {color}{color:#000000}udfString {color}= udf((num: Int) => { {color:#0033b3}if {color}(num == {color:#1750eb}3{color}) { {color:#000000}None{color} } {color:#0033b3}else {color}{ Some((num + {color:#1750eb}1{color}).toString) } }) was: When using an UDF returning string or complex type (Struct) on array members the resulting array consists of the last array member UDF result. h3. *Example code:* {color:#0033b3}import {color}org.apache.spark.sql.\{{color:#000000}Column, SparkSession} import org.apache.spark.sql.{color:#000000}functions{color}. {callUDF, col, transform, udf} {color:#0033b3}val {color}{color:#000000}sparkBuilder{color}: {color:#000000}SparkSession{color}.{color:#000000}Builder {color}= {color:#000000}SparkSession{color}.builder() .master({color:#067d17}"local[*]"{color}) .appName({color:#067d17}s"Udf Bug Demo"{color}) .config({color:#067d17}"spark.ui.enabled"{color}, {color:#067d17}"false"{color}) .config({color:#067d17}"spark.debug.maxToStringFields"{color}, {color:#1750eb}100{color}) {color:#0033b3}val {color}{color:#000000}spark{color}: {color:#000000}SparkSession {color}= {color:#000000}sparkBuilder{color} .config({color:#067d17}"spark.driver.bindAddress"{color}, {color:#067d17}"127.0.0.1"{color}) .config({color:#067d17}"spark.driver.host"{color}, {color:#067d17}"127.0.0.1"{color}) .getOrCreate() {color:#0033b3}import {color}{color:#000000}spark{color}.{color:#000000}implicits{color}._ {color:#0033b3}case class {color}{color:#000000}Foo{color}(num: Int, s: {color:#007e8a}String{color}) {color:#0033b3}val {color}{color:#000000}src {color}= {color:#871094}Seq{color}( ({color:#1750eb}1{color}, {color:#1750eb}2{color}, Array({color:#1750eb}1{color}, {color:#1750eb}2{color}, {color:#1750eb}3{color})), ({color:#1750eb}2{color}, {color:#1750eb}2{color}, Array({color:#1750eb}2{color}, {color:#1750eb}2{color}, {color:#1750eb}2{color})), ({color:#1750eb}3{color}, {color:#1750eb}4{color}, Array({color:#1750eb}3{color}, {color:#1750eb}4{color}, {color:#1750eb}3{color}, {color:#1750eb}4{color})) ).toDF({color:#067d17}"A"{color}, {color:#067d17}"B"{color}, {color:#067d17}"C"{color}) {color:#0033b3}val {color}{color:#000000}udfStringName {color}= {color:#067d17}"UdfString"{color}{color:#0033b3}val {color}{color:#000000}udfIntName {color}= {color:#067d17}"UdfInt"{color}{color:#0033b3}val {color}{color:#000000}udfStructName {color}= {color:#067d17}"UdfStruct"{color}{color:#0033b3}val {color}{color:#000000}udfString {color}= udf((num: Int) => { (num + {color:#1750eb}1{color}).toString }) {color:#000000}spark{color}.udf.register({color:#000000}udfStringName{color}, {color:#000000}udfString{color}) {color:#0033b3}val {color}{color:#000000}udfInt {color}= udf((num: Int) => { num + {color:#1750eb}1 \{color}}) spark{color}.udf.register({color:#000000}udfIntName{color}, {color:#000000}udfInt{color}) {color:#0033b3}val {color}{color:#000000}udfStruct {color}= udf((num: Int) => { Foo(num + {color:#1750eb}1{color}, (num + {color:#1750eb}1{color}).toString) }) {color:#000000}spark{color}.udf.register({color:#000000}udfStructName{color}, {color:#000000}udfStruct{color}) {color:#0033b3}val {color}{color:#000000}lambdaString {color}= (forCol: {color:#000000}Column{color}) => callUDF({color:#000000}udfStringName{color}, forCol) {color:#0033b3}val {color}{color:#000000}lambdaInt {color}= (forCol: {color:#000000}Column{color}) => callUDF({color:#000000}udfIntName{color}, forCol) {color:#0033b3}val {color}{color:#000000}lambdaStruct {color}= (forCol: {color:#000000}Column{color}) => callUDF({color:#000000}udfStructName{color}, forCol) {color:#0033b3}val {color}{color:#000000}cA {color}= callUDF({color:#000000}udfStringName{color}, col({color:#067d17}"A"{color})) {color:#0033b3}val {color}{color:#000000}cB {color}= callUDF({color:#000000}udfStringName{color}, col({color:#067d17}"B"{color})) {color:#0033b3}val {color}{color:#000000}cCString{color}: {color:#000000}Column {color}= transform(col({color:#067d17}"C"{color}), {color:#000000}lambdaString{color}) {color:#0033b3}val {color}{color:#000000}cCInt{color}: {color:#000000}Column {color}= transform(col({color:#067d17}"C"{color}), {color:#000000}lambdaInt{color}) {color:#0033b3}val {color}{color:#000000}cCStruc{color}: {color:#000000}Column {color}= transform(col({color:#067d17}"C"{color}), {color:#000000}lambdaStruct{color}) {color:#0033b3}val {color}{color:#000000}dest {color}= {color:#000000}src{color}.withColumn({color:#067d17}"AStr"{color}, {color:#000000}cA{color}) .withColumn({color:#067d17}"BStr"{color}, {color:#000000}cB{color}) .withColumn({color:#067d17}"CString (Wrong)"{color}, {color:#000000}cCString{color}) .withColumn({color:#067d17}"CInt (OK)"{color}, {color:#000000}cCInt{color}) .withColumn({color:#067d17}"CStruct (Wrong)"{color}, {color:#000000}cCStruc{color}) {color:#000000}dest{color}.show({color:#0033b3}false{color}) {color:#000000}dest{color}.printSchema() h3. *Expected:* {noformat} +---+---+------------+----+----+---------------+------------+--------------------------------+ |A |B |C |AStr|BStr|CString |CInt |CStruct | +---+---+------------+----+----+---------------+------------+--------------------------------+ |1 |2 |[1, 2, 3] |2 |3 |[2, 3, 4] |[2, 3, 4] |[{2, 2}, {3, 3}, {4, 4}] | |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, {3, 3}] | |3 |4 |[3, 4, 3, 4]|4 |5 |[4, 5, 4, 5] |[4, 5, 4, 5]|[{4, 4}, {5, 5}, {4, 4}, {5, 5}]| +---+---+------------+----+----+---------------+------------+--------------------------------+ {noformat} h3. *Got:* {noformat} +---+---+------------+----+----+---------------+------------+--------------------------------+ |A |B |C |AStr|BStr|CString (Wrong)|CInt (Ok) |CStruct (Wrong) | +---+---+------------+----+----+---------------+------------+--------------------------------+ |1 |2 |[1, 2, 3] |2 |3 |[4, 4, 4] |[2, 3, 4] |[{4, 4}, {4, 4}, {4, 4}] | |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, {3, 3}] | |3 |4 |[3, 4, 3, 4]|4 |5 |[5, 5, 5, 5] |[4, 5, 4, 5]|[{5, 5}, {5, 5}, {5, 5}, {5, 5}]| +---+---+------------+----+----+---------------+------------+--------------------------------+ {noformat} h3. *Observation* * Work correctly on Spark 3.0.2 * When UDF is registered as Java UDF, it works as supposed * The UDF is called the appropriate number of times (regardless if UDF is marked as deterministic or non-deterministic). * When debugged, the correct value is actually saved into the result array at first but every subsequent item processing overwrites the previous result values as well. Therefore the last item values filling the array is the final result. * When the UDF returns NULL/None it does not "overwrite” the prior array values nor is “overwritten” by subsequent non-NULL values. See with following UDF impelementation: {color:#0033b3}val {color}{color:#000000}udfString {color}= udf((num: Int) => { {color:#0033b3}if {color}(num == {color:#1750eb}3{color}) { {color:#000000}None {color} } {color:#0033b3}else {color}{ Some((num + {color:#1750eb}1{color}).toString) } }) > Scala UDF returning string or complex type applied to array members returns > wrong data > -------------------------------------------------------------------------------------- > > Key: SPARK-35371 > URL: https://issues.apache.org/jira/browse/SPARK-35371 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.1 > Reporter: David Benedeki > Priority: Major > > When using an UDF returning string or complex type (Struct) on array members > the resulting array consists of the last array member UDF result. > h3. *Example code:* > {color:#0033b3}import {color}org.apache.spark.sql.{{color:#000000}Column, > SparkSession} > import org.apache.spark.sql.functions{color}.\{callUDF, col, transform, udf} > {color:#0033b3}val {color}{color:#000000}sparkBuilder{color}: > {color:#000000}SparkSession{color}.{color:#000000}Builder {color}= > {color:#000000}SparkSession{color}.builder() > .master({color:#067d17}"local[*]"{color}) > .appName({color:#067d17}s"Udf Bug Demo"{color}) > .config({color:#067d17}"spark.ui.enabled"{color}, > {color:#067d17}"false"{color}) > .config({color:#067d17}"spark.debug.maxToStringFields"{color}, > {color:#1750eb}100{color}) > {color:#0033b3}val {color}{color:#000000}spark{color}: > {color:#000000}SparkSession {color}= {color:#000000}sparkBuilder{color} > .config({color:#067d17}"spark.driver.bindAddress"{color}, > {color:#067d17}"127.0.0.1"{color}) > .config({color:#067d17}"spark.driver.host"{color}, > {color:#067d17}"127.0.0.1"{color}) > .getOrCreate() > {color:#0033b3}import > {color}{color:#000000}spark{color}.{color:#000000}implicits{color}._ > {color:#0033b3}case class {color}{color:#000000}Foo{color}(num: Int, s: > {color:#007e8a}String{color}) > {color:#0033b3}val {color}{color:#000000}src {color}= > {color:#871094}Seq{color}( > ({color:#1750eb}1{color}, {color:#1750eb}2{color}, > Array({color:#1750eb}1{color}, {color:#1750eb}2{color}, > {color:#1750eb}3{color})), > ({color:#1750eb}2{color}, {color:#1750eb}2{color}, > Array({color:#1750eb}2{color}, {color:#1750eb}2{color}, > {color:#1750eb}2{color})), > ({color:#1750eb}3{color}, {color:#1750eb}4{color}, > Array({color:#1750eb}3{color}, {color:#1750eb}4{color}, > {color:#1750eb}3{color}, {color:#1750eb}4{color})) > ).toDF({color:#067d17}"A"{color}, {color:#067d17}"B"{color}, > {color:#067d17}"C"{color}) > {color:#0033b3}val {color}{color:#000000}udfStringName {color}= > {color:#067d17}"UdfString"{color}{color:#0033b3}val > {color}{color:#000000}udfIntName {color}= > {color:#067d17}"UdfInt"{color}{color:#0033b3}val > {color}{color:#000000}udfStructName {color}= > {color:#067d17}"UdfStruct"{color}{color:#0033b3}val > {color}{color:#000000}udfString {color}= udf((num: Int) => { > (num + {color:#1750eb}1{color}).toString > }) > > {color:#000000}spark{color}.udf.register({color:#000000}udfStringName{color}, > {color:#000000}udfString{color}) > {color:#0033b3}val {color}{color:#000000}udfInt {color}= udf((num: Int) => { > num + {color:#1750eb}1 > }) > spark{color}.udf.register({color:#000000}udfIntName{color}, > {color:#000000}udfInt{color}) > {color:#0033b3}val {color}{color:#000000}udfStruct {color}= udf((num: Int) => > { > Foo(num + {color:#1750eb}1{color}, (num + {color:#1750eb}1{color}).toString) > }) > > {color:#000000}spark{color}.udf.register({color:#000000}udfStructName{color}, > {color:#000000}udfStruct{color}) > {color:#0033b3}val {color}{color:#000000}lambdaString {color}= (forCol: > {color:#000000}Column{color}) => callUDF({color:#000000}udfStringName{color}, > forCol) > {color:#0033b3}val {color}{color:#000000}lambdaInt {color}= (forCol: > {color:#000000}Column{color}) => callUDF({color:#000000}udfIntName{color}, > forCol) > {color:#0033b3}val {color}{color:#000000}lambdaStruct {color}= (forCol: > {color:#000000}Column{color}) => callUDF({color:#000000}udfStructName{color}, > forCol) > {color:#0033b3}val {color}{color:#000000}cA {color}= > callUDF({color:#000000}udfStringName{color}, col({color:#067d17}"A"{color})) > {color:#0033b3}val {color}{color:#000000}cB {color}= > callUDF({color:#000000}udfStringName{color}, col({color:#067d17}"B"{color})) > {color:#0033b3}val {color}{color:#000000}cCString{color}: > {color:#000000}Column {color}= transform(col({color:#067d17}"C"{color}), > {color:#000000}lambdaString{color}) > {color:#0033b3}val {color}{color:#000000}cCInt{color}: {color:#000000}Column > {color}= transform(col({color:#067d17}"C"{color}), > {color:#000000}lambdaInt{color}) > {color:#0033b3}val {color}{color:#000000}cCStruc{color}: > {color:#000000}Column {color}= transform(col({color:#067d17}"C"{color}), > {color:#000000}lambdaStruct{color}) > {color:#0033b3}val {color}{color:#000000}dest {color}= > {color:#000000}src{color}.withColumn({color:#067d17}"AStr"{color}, > {color:#000000}cA{color}) > .withColumn({color:#067d17}"BStr"{color}, {color:#000000}cB{color}) > .withColumn({color:#067d17}"CString (Wrong)"{color}, > {color:#000000}cCString{color}) > .withColumn({color:#067d17}"CInt (OK)"{color}, {color:#000000}cCInt{color}) > .withColumn({color:#067d17}"CStruct (Wrong)"{color}, > {color:#000000}cCStruc{color}) > {color:#000000}dest{color}.show({color:#0033b3}false{color}) > {color:#000000}dest{color}.printSchema() > h3. *Expected:* > {noformat} > +---+---+------------+----+----+---------------+------------+--------------------------------+ > |A |B |C |AStr|BStr|CString |CInt |CStruct > | > +---+---+------------+----+----+---------------+------------+--------------------------------+ > |1 |2 |[1, 2, 3] |2 |3 |[2, 3, 4] |[2, 3, 4] |[{2, 2}, {3, 3}, > {4, 4}] | > |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, > {3, 3}] | > |3 |4 |[3, 4, 3, 4]|4 |5 |[4, 5, 4, 5] |[4, 5, 4, 5]|[{4, 4}, {5, 5}, > {4, 4}, {5, 5}]| > +---+---+------------+----+----+---------------+------------+--------------------------------+ > {noformat} > h3. *Got:* > {noformat} > +---+---+------------+----+----+---------------+------------+--------------------------------+ > |A |B |C |AStr|BStr|CString (Wrong)|CInt (Ok) |CStruct (Wrong) > | > +---+---+------------+----+----+---------------+------------+--------------------------------+ > |1 |2 |[1, 2, 3] |2 |3 |[4, 4, 4] |[2, 3, 4] |[{4, 4}, {4, 4}, > {4, 4}] | > |2 |2 |[2, 2, 2] |3 |3 |[3, 3, 3] |[3, 3, 3] |[{3, 3}, {3, 3}, > {3, 3}] | > |3 |4 |[3, 4, 3, 4]|4 |5 |[5, 5, 5, 5] |[4, 5, 4, 5]|[{5, 5}, {5, 5}, > {5, 5}, {5, 5}]| > +---+---+------------+----+----+---------------+------------+--------------------------------+ > {noformat} > h3. *Observation* > * Work correctly on Spark 3.0.2 > * When UDF is registered as Java UDF, it works as supposed > * The UDF is called the appropriate number of times (regardless if UDF is > marked as deterministic or non-deterministic). > * When debugged, the correct value is actually saved into the result array > at first but every subsequent item processing overwrites the previous result > values as well. Therefore the last item values filling the array is the final > result. > * When the UDF returns NULL/None it does not "overwrite” the prior array > values nor is “overwritten” by subsequent non-NULL values. See with following > UDF impelementation: > {color:#0033b3}val {color}{color:#000000}udfString {color}= udf((num: Int) => > { > {color:#0033b3}if {color}(num == {color:#1750eb}3{color}) { > {color:#000000}None{color} } {color:#0033b3}else {color}{ > Some((num + {color:#1750eb}1{color}).toString) > } > }) > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org