This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fab6d83ac70f [SPARK-48921][SQL] ScalaUDF encoders in subquery should 
be resolved for MergeInto
fab6d83ac70f is described below

commit fab6d83ac70f61925edb6eb0b1fb600fb59c9761
Author: Liang-Chi Hsieh <vii...@gmail.com>
AuthorDate: Thu Jul 18 13:00:14 2024 -0700

    [SPARK-48921][SQL] ScalaUDF encoders in subquery should be resolved for 
MergeInto
    
    ### What changes were proposed in this pull request?
    
    We got a customer issue that a `MergeInto` query on Iceberg table works 
earlier but cannot work after upgrading to Spark 3.4.
    
    The error looks like
    
    ```
    Caused by: org.apache.spark.SparkRuntimeException: Error while decoding: 
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
nullable on unresolved object
    upcast(getcolumnbyordinal(0, StringType), StringType, - root class: 
java.lang.String).toString.
    ```
    
    The source table of `MergeInto` uses `ScalaUDF`. The error happens when 
Spark invokes the deserializer of input encoder of the `ScalaUDF` and the 
deserializer is not resolved yet.
    
    The encoders of ScalaUDF are resolved by the rule `ResolveEncodersInUDF` 
which will be applied at the end of analysis phase.
    
    During rewriting `MergeInto` to `ReplaceData` query, Spark creates an 
`Exists` subquery and `ScalaUDF` is part of the plan of the subquery. Note that 
the `ScalaUDF` is already resolved by the analyzer.
    
    Then, in `ResolveSubquery` rule which resolves the subquery, it will 
resolve the subquery plan if it is not resolved yet. Because the subquery 
containing `ScalaUDF` is resolved, the rule skips it so `ResolveEncodersInUDF` 
won't be applied on it. So the analyzed `ReplaceData` query contains a 
`ScalaUDF` with encoders unresolved that cause the error.
    
    This patch modifies `ResolveSubquery` so it will resolve subquery plan if 
it is not analyzed to make sure subquery plan is fully analyzed.
    
    This patch moves `ResolveEncodersInUDF` rule before rewriting `MergeInto` 
to make sure the `ScalaUDF` in the subquery plan is fully analyzed.
    
    ### Why are the changes needed?
    
    Fixing production query error.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, fixing user-facing issue.
    
    ### How was this patch tested?
    
    Manually test with `MergeInto` query and add an unit test.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47380 from viirya/fix_subquery_resolve.
    
    Lead-authored-by: Liang-Chi Hsieh <vii...@gmail.com>
    Co-authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 11 ++-
 .../analysis/ResolveEncodersInUDFSuite.scala       | 96 ++++++++++++++++++++++
 2 files changed, 104 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 95e2ddd40af1..1b194da5ab0a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -325,9 +325,6 @@ class Analyzer(override val catalogManager: CatalogManager) 
extends RuleExecutor
       new ResolveIdentifierClause(earlyBatches) ::
       ResolveUnion ::
       ResolveRowLevelCommandAssignments ::
-      RewriteDeleteFromTable ::
-      RewriteUpdateTable ::
-      RewriteMergeIntoTable ::
       MoveParameterizedQueriesDown ::
       BindParameters ::
       typeCoercionRules() ++
@@ -354,6 +351,14 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       UpdateAttributeNullability),
     Batch("UDF", Once,
       ResolveEncodersInUDF),
+    // The rewrite rules might move resolved query plan into subquery. Once 
the resolved plan
+    // contains ScalaUDF, their encoders won't be resolved if 
`ResolveEncodersInUDF` is not
+    // applied before the rewrite rules. So we need to apply 
`ResolveEncodersInUDF` before the
+    // rewrite rules.
+    Batch("DML rewrite", fixedPoint,
+      RewriteDeleteFromTable,
+      RewriteUpdateTable,
+      RewriteMergeIntoTable),
     Batch("Subquery", Once,
       UpdateOuterReferences),
     Batch("Cleanup", fixedPoint,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala
new file mode 100644
index 000000000000..e50aa8c3a3f7
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Exists, 
ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, 
MergeIntoTable, ReplaceData, UpdateAction}
+import org.apache.spark.sql.catalyst.trees.TreePattern
+import org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTable
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, 
StructField, StructType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class ResolveEncodersInUDFSuite extends AnalysisTest {
+  test("SPARK-48921: ScalaUDF encoders in subquery should be resolved for 
MergeInto") {
+    val table = new InMemoryRowLevelOperationTable("table",
+      StructType(StructField("a", IntegerType) ::
+        StructField("b", DoubleType) ::
+        StructField("c", StringType) :: Nil),
+      Array.empty,
+      new java.util.HashMap[String, String]()
+    )
+    val relation = DataSourceV2Relation(table,
+      Seq(AttributeReference("a", IntegerType)(),
+        AttributeReference("b", DoubleType)(),
+        AttributeReference("c", StringType)()),
+      None,
+      None,
+      CaseInsensitiveStringMap.empty()
+    )
+
+
+    val string = relation.output(2)
+    val udf = ScalaUDF((_: String) => "x", StringType, string :: Nil,
+      Option(ExpressionEncoder[String]()) :: Nil)
+
+    val mergeIntoSource =
+      relation
+        .where($"c" === udf)
+        .select($"a", $"b")
+        .limit(1)
+    val cond = mergeIntoSource.output(0) == relation.output(0) &&
+      mergeIntoSource.output(1) == relation.output(1)
+
+    val mergePlan = MergeIntoTable(
+      relation,
+      mergeIntoSource,
+      cond,
+      Seq(UpdateAction(None,
+        Seq(Assignment(relation.output(0), relation.output(0)),
+          Assignment(relation.output(1), relation.output(1)),
+          Assignment(relation.output(2), relation.output(2))))),
+      Seq.empty,
+      Seq.empty,
+      withSchemaEvolution = false)
+
+    val replaceData = mergePlan.analyze.asInstanceOf[ReplaceData]
+
+    val existsPlans = replaceData.groupFilterCondition.map(_.collect {
+      case e: Exists =>
+        e.plan.collect {
+          case f: Filter if f.containsPattern(TreePattern.SCALA_UDF) => f
+        }
+    }.flatten)
+
+    assert(existsPlans.isDefined)
+
+    val udfs = existsPlans.get.map(_.expressions.flatMap(e => e.collect {
+      case s: ScalaUDF =>
+        assert(s.inputEncoders.nonEmpty)
+        val encoder = s.inputEncoders.head
+        assert(encoder.isDefined)
+        assert(encoder.get.objDeserializer.resolved)
+
+        s
+    })).flatten
+    assert(udfs.size == 1)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to