This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 1a49de67e3f [SPARK-39177][SQL] Provide query context on map key not exists error when WSCG is off 1a49de67e3f is described below commit 1a49de67e3fa0d25e84540313688cde82d6001df Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Fri May 13 21:45:06 2022 +0800 [SPARK-39177][SQL] Provide query context on map key not exists error when WSCG is off ### What changes were proposed in this pull request? Similar to https://github.com/apache/spark/pull/36525, this PR provides query context for "map key not exists" runtime error when WSCG is off. ### Why are the changes needed? Enhance the runtime error query context for "map key not exists" runtime error. After changes, it works when the whole stage codegen is not available. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #36538 from gengliangwang/fixMapKeyContext. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> (cherry picked from commit 1afddf407436c3b315ec601fab5a4a1b2028e672) Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../catalyst/expressions/collectionOperations.scala | 6 ++++++ .../catalyst/expressions/complexTypeExtractors.scala | 13 ++++++++++--- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 19 +++++++++++++++++++ 3 files changed, 35 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala index 1b42ea5eb87..1bd934214f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala @@ -2261,6 +2261,12 @@ case class ElementAt( override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): ElementAt = copy(left = newLeft, right = newRight) + + override def initQueryContext(): String = if (failOnError) { + origin.context + } else { + "" + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala index 45661c00c51..b84050c1837 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala @@ -339,7 +339,8 @@ trait GetArrayItemUtil { /** * Common trait for [[GetMapValue]] and [[ElementAt]]. */ -trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes { +trait GetMapValueUtil + extends BinaryExpression with ImplicitCastInputTypes with SupportQueryContext { // todo: current search is O(n), improve it. def getValueEval( @@ -365,7 +366,7 @@ trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes { if (!found) { if (failOnError) { - throw QueryExecutionErrors.mapKeyNotExistError(ordinal, keyType, origin.context) + throw QueryExecutionErrors.mapKeyNotExistError(ordinal, keyType, queryContext) } else { null } @@ -398,7 +399,7 @@ trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes { } val keyJavaType = CodeGenerator.javaType(keyType) - lazy val errorContext = ctx.addReferenceObj("errCtx", origin.context) + lazy val errorContext = ctx.addReferenceObj("errCtx", queryContext) val keyDt = ctx.addReferenceObj("keyType", keyType, keyType.getClass.getName) nullSafeCodeGen(ctx, ev, (eval1, eval2) => { val keyNotFoundBranch = if (failOnError) { @@ -488,4 +489,10 @@ case class GetMapValue( override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): GetMapValue = copy(child = newLeft, key = newRight) + + override def initQueryContext(): String = if (failOnError) { + origin.context + } else { + "" + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 68db57ea364..21ce009a907 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -4404,6 +4404,25 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark } } + test("SPARK-39177: Query context of getting map value should be serialized to executors" + + " when WSCG is off") { + withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false", + SQLConf.ANSI_ENABLED.key -> "true") { + withTable("t") { + sql("create table t(m map<string, string>) using parquet") + sql("insert into t values map('a', 'b')") + Seq( + "select m['foo'] from t", + "select element_at(m, 'foo') from t").foreach { query => + val msg = intercept[SparkException] { + sql(query).collect() + }.getMessage + assert(msg.contains(query)) + } + } + } + } + test("SPARK-38589: try_avg should return null if overflow happens before merging") { val yearMonthDf = Seq(Int.MaxValue, Int.MaxValue, 2) .map(Period.ofMonths) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org