[ 
https://issues.apache.org/jira/browse/SPARK-38030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17489016#comment-17489016
 ] 

Shardul Mahadik commented on SPARK-38030:
-----------------------------------------

During the PR reviews, we used a different approach than the one mentioned in 
the ticket. Instead of removing nullability hints from Cast's target DataType 
to match AttributeReference, we instead preserved nullability hints during 
canonicalization of AttributeReference. (more details in [this 
comment|https://github.com/apache/spark/pull/35332#discussion_r793367870])

[~xkrogen] The error is happening at a leaf node {{AttributeReference}}, so 
only that one node is printed, there are no children. Usually the error message 
would say {{"tree: 'columnName"}}, but the canonicalization process strips out 
column name and makes them empty strings.

> Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-38030
>                 URL: https://issues.apache.org/jira/browse/SPARK-38030
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: Shardul Mahadik
>            Assignee: Shardul Mahadik
>            Priority: Major
>             Fix For: 3.3.0
>
>
> One of our user queries failed in Spark 3.1.1 when using AQE with the 
> following stacktrace mentioned below (some parts of the plan have been 
> redacted, but the structure is preserved).
> Debugging this issue, we found that the failure was within AQE calling 
> [QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402].
> The query contains a cast over a column with non-nullable struct fields. 
> Canonicalization [removes nullability 
> information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45]
>  from the child {{AttributeReference}} of the Cast, however it does not 
> remove nullability information from the Cast's target dataType. This causes 
> the 
> [checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290]
>  to return false because the child is now nullable and cast target data type 
> is not, leading to {{resolved=false}} and hence the {{UnresolvedException}}.
> {code:java}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232]
> +- Union
>    :- Project [cast(columnA#30) as struct<...>]
>    :  +- BatchScan[columnA#30] hive.tbl 
>    +- Project [cast(columnA#35) as struct<...>]
>       +- BatchScan[columnA#35] hive.tbl
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464)
>   at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87)
>   at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:179)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:279)
>   at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
>   at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
>   at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:825)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:784)
>   ... 85 elided
> Caused by: java.lang.reflect.InvocationTargetException: 
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> nullable on unresolved object, tree: '
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:508)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:77)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:507)
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>   ... 127 more
> Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: 
> Invalid call to nullable on unresolved object, tree: '
>   at 
> org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.nullable(unresolved.scala:150)
>   at 
> org.apache.spark.sql.execution.UnionExec.$anonfun$output$5(basicPhysicalOperators.scala:655)
>   at 
> org.apache.spark.sql.execution.UnionExec.$anonfun$output$5$adapted(basicPhysicalOperators.scala:655)
>   at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95)
>   at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92)
>   at scala.collection.immutable.List.exists(List.scala:89)
>   at 
> org.apache.spark.sql.execution.UnionExec.$anonfun$output$4(basicPhysicalOperators.scala:655)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.execution.UnionExec.output(basicPhysicalOperators.scala:653)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.<init>(ShuffleExchangeExec.scala:121)
>   ... 135 more
> {code}
> Repro in Spark 3.1.1
> {code:java}
>   test("SPARK-XXXXX: Repro: Query with cast containing non-nullable columns 
> fails with AQE") {
>     import scala.collection.JavaConverters._
>     withSQLConf(
>       SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
>         (ConvertToLocalRelation.ruleName + "," + 
> PropagateEmptyRelation.ruleName),
>       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
>       val nameType = StructType(Seq(StructField("firstName", StringType, 
> nullable = false)))
>       val schema = StructType(Seq(StructField("name", nameType, nullable = 
> false)))
>       // change column name from firstName to fname, should be irrelevant for 
> Cast
>       val newNameType = StructType(Seq(StructField("fname", StringType, 
> nullable = false)))
>       val df = spark.createDataFrame(List.empty[Row].asJava, schema)
>       val df1 = df.withColumn("newName", 'name.cast(newNameType))
>       val df2 = df1.union(df1).repartition(1)
>       df2.show()
>     }
>   }
> {code}
> I believe that during canonicalization, the nullability hints should also be 
> removed from the target data type of {{Cast}} so that they match with the 
> child's canonicalized representation.
> --------
> This exact issue is not reproducible in master. This is because the code 
> which would previously trigger access on an unresolved object is [now 
> lazy|https://github.com/apache/spark/blob/7e5c3b216431b6a5e9a0786bf7cded694228cdee/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L132]
>  and hence does not trigger the issue. However the root cause is still 
> present in master, and some other codepath which depends on canonicalized 
> representations can trigger the same issue, although I haven't been able to 
> come up with a good example yet.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to