This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new f4dc254ee0bd [SPARK-47927][SQL] Fix nullability attribute in UDF 
decoder
f4dc254ee0bd is described below

commit f4dc254ee0bde2e8cac7e3f5b1a22017be9d6fba
Author: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
AuthorDate: Sun Apr 28 13:46:03 2024 +0800

    [SPARK-47927][SQL] Fix nullability attribute in UDF decoder
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a correctness issue by moving the batch that resolves udf 
decoders to after the `UpdateNullability` batch. This means we now derive a  
decoder with the updated attributes which fixes a correctness issue.
    
    I think the issue has existed since 
https://github.com/apache/spark/pull/28645 when udf support case class 
arguments was added. So therefore this issue should be present in all currently 
supported versions.
    
    ### Why are the changes needed?
    
    Currently the following code
    ```
    scala> val ds1 = Seq(1).toDS()
         | val ds2 = Seq[Int]().toDS()
         | val f = udf[Tuple1[Option[Int]],Tuple1[Option[Int]]](identity)
         | ds1.join(ds2, ds1("value") === ds2("value"), 
"left_outer").select(f(struct(ds2("value")))).collect()
    val ds1: org.apache.spark.sql.Dataset[Int] = [value: int]
    val ds2: org.apache.spark.sql.Dataset[Int] = [value: int]
    val f: org.apache.spark.sql.expressions.UserDefinedFunction = 
SparkUserDefinedFunction($Lambda$2481/0x00007f7f50961f086b1a2c9f,StructType(StructField(_1,IntegerType,true)),List(Some(class[_1[0]:
 int])),Some(class[_1[0]: int]),None,true,true)
    val res0: Array[org.apache.spark.sql.Row] = Array([[0]])
    ```
    results in an row containing `0` this is incorrect as the value should be 
`null`. Removing the udf call
    ```
    scala> ds1.join(ds2, ds1("value") === ds2("value"), 
"left_outer").select(struct(ds2("value"))).collect()
    val res1: Array[org.apache.spark.sql.Row] = Array([[null]])
    ```
    gives the correct value.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, fixes a correctness issue when using ScalaUDFs.
    
    ### How was this patch tested?
    
    Existing and new unit tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #46156 from eejbyfeldt/SPARK-47927.
    
    Authored-by: Emil Ejbyfeldt <eejbyfe...@liveintent.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 8b8ea60bd4f22ea5763a77bac2d51f25d2479be9)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala     |  4 ++--
 sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala   | 11 +++++++++++
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3b19b1a12e76..74061f2b8f21 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -332,11 +332,11 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       new ResolveHints.RemoveAllHints),
     Batch("Nondeterministic", Once,
       PullOutNondeterministic),
+    Batch("UpdateNullability", Once,
+      UpdateAttributeNullability),
     Batch("UDF", Once,
       HandleNullInputsForUDF,
       ResolveEncodersInUDF),
-    Batch("UpdateNullability", Once,
-      UpdateAttributeNullability),
     Batch("Subquery", Once,
       UpdateOuterReferences),
     Batch("Cleanup", fixedPoint,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 734fcebc80e7..2cee86be7d31 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -1060,4 +1060,15 @@ class UDFSuite extends QueryTest with SharedSparkSession 
{
     }.getCause.getCause
     assert(e.isInstanceOf[java.lang.ArithmeticException])
   }
+
+  test("SPARK-47927: Correctly pass null values derived from join to UDF") {
+    val f = udf[Tuple1[Option[Int]], Tuple1[Option[Int]]](identity)
+    val ds1 = Seq(1).toDS()
+    val ds2 = Seq[Int]().toDS()
+
+    checkAnswer(
+      ds1.join(ds2, ds1("value") === ds2("value"), "left_outer")
+        .select(f(struct(ds2("value").as("_1")))),
+      Row(Row(null)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to