[ 
https://issues.apache.org/jira/browse/SPARK-35371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17342937#comment-17342937
 ] 

L. C. Hsieh commented on SPARK-35371:
-------------------------------------

Oh, I think it was fixed by SPARK-34829.

> Scala UDF returning string or complex type applied to array members returns 
> wrong data
> --------------------------------------------------------------------------------------
>
>                 Key: SPARK-35371
>                 URL: https://issues.apache.org/jira/browse/SPARK-35371
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: David Benedeki
>            Priority: Major
>
> When using an UDF returning string or complex type (Struct) on array members 
> the resulting array consists of the last array member UDF result.
> h3. *Example code:*
> {code:scala}
> import org.apache.spark.sql.{Column, SparkSession}
> import org.apache.spark.sql.functions.{callUDF, col, transform, udf}
> val sparkBuilder: SparkSession.Builder = SparkSession.builder()
>   .master("local[*]")
>   .appName(s"Udf Bug Demo")
>   .config("spark.ui.enabled", "false")
>   .config("spark.debug.maxToStringFields", 100)
> val spark: SparkSession = sparkBuilder
>   .config("spark.driver.bindAddress", "127.0.0.1")
>   .config("spark.driver.host", "127.0.0.1")
>   .getOrCreate()
> import spark.implicits._
> case class Foo(num: Int, s: String)
> val src  = Seq(
>   (1, 2, Array(1, 2, 3)),
>   (2, 2, Array(2, 2, 2)),
>   (3, 4, Array(3, 4, 3, 4))
> ).toDF("A", "B", "C")
> val udfStringName = "UdfString"
> val udfIntName = "UdfInt"
> val udfStructName = "UdfStruct"
> val udfString = udf((num: Int) => {
>   (num + 1).toString
> })
> spark.udf.register(udfStringName, udfString)
> val udfInt = udf((num: Int) => {
>   num + 1
> })
> spark.udf.register(udfIntName, udfInt)
> val udfStruct = udf((num: Int) => {
>   Foo(num + 1, (num + 1).toString)
> })
> spark.udf.register(udfStructName, udfStruct)
> val lambdaString = (forCol: Column) => callUDF(udfStringName, forCol)
> val lambdaInt = (forCol: Column) => callUDF(udfIntName, forCol)
> val lambdaStruct = (forCol: Column) => callUDF(udfStructName, forCol)
> val cA = callUDF(udfStringName, col("A"))
> val cB = callUDF(udfStringName, col("B"))
> val cCString: Column = transform(col("C"), lambdaString)
> val cCInt: Column = transform(col("C"), lambdaInt)
> val cCStruc: Column = transform(col("C"), lambdaStruct)
> val dest = src.withColumn("AStr", cA)
>   .withColumn("BStr", cB)
>   .withColumn("CString (Wrong)", cCString)
>   .withColumn("CInt (OK)", cCInt)
>   .withColumn("CStruct (Wrong)", cCStruc)
> dest.show(false)
> dest.printSchema()
> {code}
> h3. *Expected:*
> {noformat}
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |A  |B  |C           |AStr|BStr|CString        |CInt        |CStruct          
>             |
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |1  |2  |[1, 2, 3]   |2   |3   |[2, 3, 4]      |[2, 3, 4]   |[{2, 2}, {3, 3}, 
> {4, 4}]        |
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]        |
> |3  |4  |[3, 4, 3, 4]|4   |5   |[4, 5, 4, 5]   |[4, 5, 4, 5]|[{4, 4}, {5, 5}, 
> {4, 4}, {5, 5}]|
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> {noformat}
> h3. *Got:*
> {noformat}
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |A  |B  |C           |AStr|BStr|CString (Wrong)|CInt (Ok)   |CStruct (Wrong)  
>                |
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> |1  |2  |[1, 2, 3]   |2   |3   |[4, 4, 4]      |[2, 3, 4]   |[{4, 4}, {4, 4}, 
> {4, 4}]        |
> |2  |2  |[2, 2, 2]   |3   |3   |[3, 3, 3]      |[3, 3, 3]   |[{3, 3}, {3, 3}, 
> {3, 3}]        |
> |3  |4  |[3, 4, 3, 4]|4   |5   |[5, 5, 5, 5]   |[4, 5, 4, 5]|[{5, 5}, {5, 5}, 
> {5, 5}, {5, 5}]|
> +---+---+------------+----+----+---------------+------------+--------------------------------+
> {noformat}
> h3. *Observation*
>  * Work correctly on Spark 3.0.2
>  * When UDF is registered as Java UDF, it works as supposed
>  * The UDF is called the appropriate number of times (regardless if UDF is 
> marked as deterministic or non-deterministic).
>  * When debugged, the correct value is actually saved into the result array 
> at first but every subsequent item processing overwrites the previous result 
> values as well. Therefore the last item values filling the array is the final 
> result.
>  * When the UDF returns NULL/None it does not "overwrite” the prior array 
> values nor is “overwritten” by subsequent non-NULL values. See with following 
> UDF impelementation:
> {code:scala}
> val udfString = udf((num: Int) => {
>   if (num == 3) {
>     None
>   } else {
>     Some((num + 1).toString)
>   }
> })
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to