This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1afddf40743 [SPARK-39177][SQL] Provide query context on map key not 
exists error when WSCG is off
1afddf40743 is described below

commit 1afddf407436c3b315ec601fab5a4a1b2028e672
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Fri May 13 21:45:06 2022 +0800

    [SPARK-39177][SQL] Provide query context on map key not exists error when 
WSCG is off
    
    ### What changes were proposed in this pull request?
    
    Similar to https://github.com/apache/spark/pull/36525, this PR provides 
query context for "map key not exists" runtime error when WSCG is off.
    
    ### Why are the changes needed?
    
    Enhance the runtime error query context for "map key not exists" runtime 
error. After changes, it works when the whole stage codegen is not available.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    Closes #36538 from gengliangwang/fixMapKeyContext.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../catalyst/expressions/collectionOperations.scala   |  6 ++++++
 .../catalyst/expressions/complexTypeExtractors.scala  | 13 ++++++++++---
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala    | 19 +++++++++++++++++++
 3 files changed, 35 insertions(+), 3 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
index 1b42ea5eb87..1bd934214f5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
@@ -2261,6 +2261,12 @@ case class ElementAt(
 
   override protected def withNewChildrenInternal(
     newLeft: Expression, newRight: Expression): ElementAt = copy(left = 
newLeft, right = newRight)
+
+  override def initQueryContext(): String = if (failOnError) {
+    origin.context
+  } else {
+    ""
+  }
 }
 
 /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
index 45661c00c51..b84050c1837 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala
@@ -339,7 +339,8 @@ trait GetArrayItemUtil {
 /**
  * Common trait for [[GetMapValue]] and [[ElementAt]].
  */
-trait GetMapValueUtil extends BinaryExpression with ImplicitCastInputTypes {
+trait GetMapValueUtil
+  extends BinaryExpression with ImplicitCastInputTypes with 
SupportQueryContext {
 
   // todo: current search is O(n), improve it.
   def getValueEval(
@@ -365,7 +366,7 @@ trait GetMapValueUtil extends BinaryExpression with 
ImplicitCastInputTypes {
 
     if (!found) {
       if (failOnError) {
-        throw QueryExecutionErrors.mapKeyNotExistError(ordinal, keyType, 
origin.context)
+        throw QueryExecutionErrors.mapKeyNotExistError(ordinal, keyType, 
queryContext)
       } else {
         null
       }
@@ -398,7 +399,7 @@ trait GetMapValueUtil extends BinaryExpression with 
ImplicitCastInputTypes {
     }
 
     val keyJavaType = CodeGenerator.javaType(keyType)
-    lazy val errorContext = ctx.addReferenceObj("errCtx", origin.context)
+    lazy val errorContext = ctx.addReferenceObj("errCtx", queryContext)
     val keyDt = ctx.addReferenceObj("keyType", keyType, 
keyType.getClass.getName)
     nullSafeCodeGen(ctx, ev, (eval1, eval2) => {
       val keyNotFoundBranch = if (failOnError) {
@@ -488,4 +489,10 @@ case class GetMapValue(
   override protected def withNewChildrenInternal(
       newLeft: Expression, newRight: Expression): GetMapValue =
     copy(child = newLeft, key = newRight)
+
+  override def initQueryContext(): String = if (failOnError) {
+    origin.context
+  } else {
+    ""
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index c0f609ad817..0355bd90d04 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -4357,6 +4357,25 @@ class SQLQuerySuite extends QueryTest with 
SharedSparkSession with AdaptiveSpark
     }
   }
 
+  test("SPARK-39177: Query context of getting map value should be serialized 
to executors" +
+    " when WSCG is off") {
+    withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
+      SQLConf.ANSI_ENABLED.key -> "true") {
+      withTable("t") {
+        sql("create table t(m map<string, string>) using parquet")
+        sql("insert into t values map('a', 'b')")
+        Seq(
+          "select m['foo'] from t",
+          "select element_at(m, 'foo') from t").foreach { query =>
+          val msg = intercept[SparkException] {
+            sql(query).collect()
+          }.getMessage
+          assert(msg.contains(query))
+        }
+      }
+    }
+  }
+
   test("SPARK-38589: try_avg should return null if overflow happens before 
merging") {
     val yearMonthDf = Seq(Int.MaxValue, Int.MaxValue, 2)
       .map(Period.ofMonths)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to