[ 
https://issues.apache.org/jira/browse/SPARK-38030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482233#comment-17482233
 ] 

Shardul Mahadik commented on SPARK-38030:
-----------------------------------------

I plan to create a PR to change the canonicalization behavior of {{Cast}} so 
that nullability information is removed from the target data type of {{Cast}} 
during canonicalization. However, the canonicalization implementation has 
changed drastically between Spark 3.1.1 and master, so I will probably create 
two PRs, 1 for master, 1 for branch-3.1.

> Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-38030
>                 URL: https://issues.apache.org/jira/browse/SPARK-38030
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: Shardul Mahadik
>            Priority: Major
>
> One of our user queries failed in Spark 3.1.1 when using AQE with the 
> following stacktrace mentioned below (some parts of the plan have been 
> redacted, but the structure is preserved).
> Debugging this issue, we found that the failure was within AQE calling 
> [QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402].
> The query contains a cast over a column with non-nullable struct fields. 
> Canonicalization [removes nullability 
> information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45]
>  from the child {{AttributeReference}} of the Cast, however it does not 
> remove nullability information from the Cast's target dataType. This causes 
> the 
> [checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290]
>  to return false because the child is now nullable and cast target data type 
> is not, leading to {{resolved=false}} and hence the {{UnresolvedException}}.
> {code:java}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, 
> tree:
> Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232]
> +- Union
>    :- Project [cast(columnA#30) as struct<...>]
>    :  +- BatchScan[columnA#30] hive.tbl 
>    +- Project [cast(columnA#35) as struct<...>]
>       +- BatchScan[columnA#35] hive.tbl
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464)
>   at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87)
>   at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373)
>   at 
> org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:179)
>   at 
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:279)
>   at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
>   at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
>   at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
>   at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
>   at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>   at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
>   at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
>   at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
>   at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
>   at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:825)
>   at org.apache.spark.sql.Dataset.show(Dataset.scala:784)
>   ... 85 elided
> Caused by: java.lang.reflect.InvocationTargetException: 
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> nullable on unresolved object, tree: '
>   at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>   at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>   at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:508)
>   at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:77)
>   at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:507)
>   at 
> org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>   ... 127 more
> Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: 
> Invalid call to nullable on unresolved object, tree: '
>   at 
> org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.nullable(unresolved.scala:150)
>   at 
> org.apache.spark.sql.execution.UnionExec.$anonfun$output$5(basicPhysicalOperators.scala:655)
>   at 
> org.apache.spark.sql.execution.UnionExec.$anonfun$output$5$adapted(basicPhysicalOperators.scala:655)
>   at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95)
>   at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92)
>   at scala.collection.immutable.List.exists(List.scala:89)
>   at 
> org.apache.spark.sql.execution.UnionExec.$anonfun$output$4(basicPhysicalOperators.scala:655)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>   at scala.collection.immutable.List.foreach(List.scala:392)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:238)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
>   at scala.collection.immutable.List.map(List.scala:298)
>   at 
> org.apache.spark.sql.execution.UnionExec.output(basicPhysicalOperators.scala:653)
>   at 
> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.<init>(ShuffleExchangeExec.scala:121)
>   ... 135 more
> {code}
> Repro in Spark 3.1.1
> {code:java}
>   test("SPARK-XXXXX: Repro: Query with cast containing non-nullable columns 
> fails with AQE") {
>     import scala.collection.JavaConverters._
>     withSQLConf(
>       SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
>         (ConvertToLocalRelation.ruleName + "," + 
> PropagateEmptyRelation.ruleName),
>       SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
>       val nameType = StructType(Seq(StructField("firstName", StringType, 
> nullable = false)))
>       val schema = StructType(Seq(StructField("name", nameType, nullable = 
> false)))
>       // change column name from firstName to fname, should be irrelevant for 
> Cast
>       val newNameType = StructType(Seq(StructField("fname", StringType, 
> nullable = false)))
>       val df = spark.createDataFrame(List.empty[Row].asJava, schema)
>       val df1 = df.withColumn("newName", 'name.cast(newNameType))
>       val df2 = df1.union(df1).repartition(1)
>       df2.show()
>     }
>   }
> {code}
> I believe that during canonicalization, the nullability hints should also be 
> removed from the target data type of {{Cast}} so that they match with the 
> child's canonicalized representation.
> --------
> This exact issue is not reproducible in master. This is because the code 
> which would previously trigger access on an unresolved object is [now 
> lazy|https://github.com/apache/spark/blob/7e5c3b216431b6a5e9a0786bf7cded694228cdee/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L132]
>  and hence does not trigger the issue. However the root cause is still 
> present in master, and some other codepath which depends on canonicalized 
> representations can trigger the same issue, although I haven't been able to 
> come up with a good example yet.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to