This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fab6d83ac70f [SPARK-48921][SQL] ScalaUDF encoders in subquery should be resolved for MergeInto fab6d83ac70f is described below commit fab6d83ac70f61925edb6eb0b1fb600fb59c9761 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Thu Jul 18 13:00:14 2024 -0700 [SPARK-48921][SQL] ScalaUDF encoders in subquery should be resolved for MergeInto ### What changes were proposed in this pull request? We got a customer issue that a `MergeInto` query on Iceberg table works earlier but cannot work after upgrading to Spark 3.4. The error looks like ``` Caused by: org.apache.spark.SparkRuntimeException: Error while decoding: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object upcast(getcolumnbyordinal(0, StringType), StringType, - root class: java.lang.String).toString. ``` The source table of `MergeInto` uses `ScalaUDF`. The error happens when Spark invokes the deserializer of input encoder of the `ScalaUDF` and the deserializer is not resolved yet. The encoders of ScalaUDF are resolved by the rule `ResolveEncodersInUDF` which will be applied at the end of analysis phase. During rewriting `MergeInto` to `ReplaceData` query, Spark creates an `Exists` subquery and `ScalaUDF` is part of the plan of the subquery. Note that the `ScalaUDF` is already resolved by the analyzer. Then, in `ResolveSubquery` rule which resolves the subquery, it will resolve the subquery plan if it is not resolved yet. Because the subquery containing `ScalaUDF` is resolved, the rule skips it so `ResolveEncodersInUDF` won't be applied on it. So the analyzed `ReplaceData` query contains a `ScalaUDF` with encoders unresolved that cause the error. This patch modifies `ResolveSubquery` so it will resolve subquery plan if it is not analyzed to make sure subquery plan is fully analyzed. This patch moves `ResolveEncodersInUDF` rule before rewriting `MergeInto` to make sure the `ScalaUDF` in the subquery plan is fully analyzed. ### Why are the changes needed? Fixing production query error. ### Does this PR introduce _any_ user-facing change? Yes, fixing user-facing issue. ### How was this patch tested? Manually test with `MergeInto` query and add an unit test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #47380 from viirya/fix_subquery_resolve. Lead-authored-by: Liang-Chi Hsieh <vii...@gmail.com> Co-authored-by: Kent Yao <y...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/catalyst/analysis/Analyzer.scala | 11 ++- .../analysis/ResolveEncodersInUDFSuite.scala | 96 ++++++++++++++++++++++ 2 files changed, 104 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 95e2ddd40af1..1b194da5ab0a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -325,9 +325,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor new ResolveIdentifierClause(earlyBatches) :: ResolveUnion :: ResolveRowLevelCommandAssignments :: - RewriteDeleteFromTable :: - RewriteUpdateTable :: - RewriteMergeIntoTable :: MoveParameterizedQueriesDown :: BindParameters :: typeCoercionRules() ++ @@ -354,6 +351,14 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor UpdateAttributeNullability), Batch("UDF", Once, ResolveEncodersInUDF), + // The rewrite rules might move resolved query plan into subquery. Once the resolved plan + // contains ScalaUDF, their encoders won't be resolved if `ResolveEncodersInUDF` is not + // applied before the rewrite rules. So we need to apply `ResolveEncodersInUDF` before the + // rewrite rules. + Batch("DML rewrite", fixedPoint, + RewriteDeleteFromTable, + RewriteUpdateTable, + RewriteMergeIntoTable), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala new file mode 100644 index 000000000000..e50aa8c3a3f7 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveEncodersInUDFSuite.scala @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Exists, ScalaUDF} +import org.apache.spark.sql.catalyst.plans.logical.{Assignment, Filter, MergeIntoTable, ReplaceData, UpdateAction} +import org.apache.spark.sql.catalyst.trees.TreePattern +import org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTable +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructField, StructType} +import org.apache.spark.sql.util.CaseInsensitiveStringMap + +class ResolveEncodersInUDFSuite extends AnalysisTest { + test("SPARK-48921: ScalaUDF encoders in subquery should be resolved for MergeInto") { + val table = new InMemoryRowLevelOperationTable("table", + StructType(StructField("a", IntegerType) :: + StructField("b", DoubleType) :: + StructField("c", StringType) :: Nil), + Array.empty, + new java.util.HashMap[String, String]() + ) + val relation = DataSourceV2Relation(table, + Seq(AttributeReference("a", IntegerType)(), + AttributeReference("b", DoubleType)(), + AttributeReference("c", StringType)()), + None, + None, + CaseInsensitiveStringMap.empty() + ) + + + val string = relation.output(2) + val udf = ScalaUDF((_: String) => "x", StringType, string :: Nil, + Option(ExpressionEncoder[String]()) :: Nil) + + val mergeIntoSource = + relation + .where($"c" === udf) + .select($"a", $"b") + .limit(1) + val cond = mergeIntoSource.output(0) == relation.output(0) && + mergeIntoSource.output(1) == relation.output(1) + + val mergePlan = MergeIntoTable( + relation, + mergeIntoSource, + cond, + Seq(UpdateAction(None, + Seq(Assignment(relation.output(0), relation.output(0)), + Assignment(relation.output(1), relation.output(1)), + Assignment(relation.output(2), relation.output(2))))), + Seq.empty, + Seq.empty, + withSchemaEvolution = false) + + val replaceData = mergePlan.analyze.asInstanceOf[ReplaceData] + + val existsPlans = replaceData.groupFilterCondition.map(_.collect { + case e: Exists => + e.plan.collect { + case f: Filter if f.containsPattern(TreePattern.SCALA_UDF) => f + } + }.flatten) + + assert(existsPlans.isDefined) + + val udfs = existsPlans.get.map(_.expressions.flatMap(e => e.collect { + case s: ScalaUDF => + assert(s.inputEncoders.nonEmpty) + val encoder = s.inputEncoders.head + assert(encoder.isDefined) + assert(encoder.get.objDeserializer.resolved) + + s + })).flatten + assert(udfs.size == 1) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org