[ https://issues.apache.org/jira/browse/SPARK-38030?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482233#comment-17482233 ]
Shardul Mahadik commented on SPARK-38030: ----------------------------------------- I plan to create a PR to change the canonicalization behavior of {{Cast}} so that nullability information is removed from the target data type of {{Cast}} during canonicalization. However, the canonicalization implementation has changed drastically between Spark 3.1.1 and master, so I will probably create two PRs, 1 for master, 1 for branch-3.1. > Query with cast containing non-nullable columns fails with AQE on Spark 3.1.1 > ----------------------------------------------------------------------------- > > Key: SPARK-38030 > URL: https://issues.apache.org/jira/browse/SPARK-38030 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.1 > Reporter: Shardul Mahadik > Priority: Major > > One of our user queries failed in Spark 3.1.1 when using AQE with the > following stacktrace mentioned below (some parts of the plan have been > redacted, but the structure is preserved). > Debugging this issue, we found that the failure was within AQE calling > [QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402]. > The query contains a cast over a column with non-nullable struct fields. > Canonicalization [removes nullability > information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45] > from the child {{AttributeReference}} of the Cast, however it does not > remove nullability information from the Cast's target dataType. This causes > the > [checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290] > to return false because the child is now nullable and cast target data type > is not, leading to {{resolved=false}} and hence the {{UnresolvedException}}. > {code:java} > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, > tree: > Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232] > +- Union > :- Project [cast(columnA#30) as struct<...>] > : +- BatchScan[columnA#30] hive.tbl > +- Project [cast(columnA#35) as struct<...>] > +- BatchScan[columnA#35] hive.tbl > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475) > at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464) > at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87) > at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58) > at > org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373) > at > org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:179) > at > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:279) > at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696) > at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722) > at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685) > at org.apache.spark.sql.Dataset.head(Dataset.scala:2722) > at org.apache.spark.sql.Dataset.take(Dataset.scala:2929) > at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301) > at org.apache.spark.sql.Dataset.showString(Dataset.scala:338) > at org.apache.spark.sql.Dataset.show(Dataset.scala:825) > at org.apache.spark.sql.Dataset.show(Dataset.scala:784) > ... 85 elided > Caused by: java.lang.reflect.InvocationTargetException: > org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to > nullable on unresolved object, tree: ' > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:508) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:77) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:507) > at > org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52) > ... 127 more > Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: > Invalid call to nullable on unresolved object, tree: ' > at > org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.nullable(unresolved.scala:150) > at > org.apache.spark.sql.execution.UnionExec.$anonfun$output$5(basicPhysicalOperators.scala:655) > at > org.apache.spark.sql.execution.UnionExec.$anonfun$output$5$adapted(basicPhysicalOperators.scala:655) > at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95) > at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92) > at scala.collection.immutable.List.exists(List.scala:89) > at > org.apache.spark.sql.execution.UnionExec.$anonfun$output$4(basicPhysicalOperators.scala:655) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) > at scala.collection.immutable.List.foreach(List.scala:392) > at scala.collection.TraversableLike.map(TraversableLike.scala:238) > at scala.collection.TraversableLike.map$(TraversableLike.scala:231) > at scala.collection.immutable.List.map(List.scala:298) > at > org.apache.spark.sql.execution.UnionExec.output(basicPhysicalOperators.scala:653) > at > org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.<init>(ShuffleExchangeExec.scala:121) > ... 135 more > {code} > Repro in Spark 3.1.1 > {code:java} > test("SPARK-XXXXX: Repro: Query with cast containing non-nullable columns > fails with AQE") { > import scala.collection.JavaConverters._ > withSQLConf( > SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> > (ConvertToLocalRelation.ruleName + "," + > PropagateEmptyRelation.ruleName), > SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { > val nameType = StructType(Seq(StructField("firstName", StringType, > nullable = false))) > val schema = StructType(Seq(StructField("name", nameType, nullable = > false))) > // change column name from firstName to fname, should be irrelevant for > Cast > val newNameType = StructType(Seq(StructField("fname", StringType, > nullable = false))) > val df = spark.createDataFrame(List.empty[Row].asJava, schema) > val df1 = df.withColumn("newName", 'name.cast(newNameType)) > val df2 = df1.union(df1).repartition(1) > df2.show() > } > } > {code} > I believe that during canonicalization, the nullability hints should also be > removed from the target data type of {{Cast}} so that they match with the > child's canonicalized representation. > -------- > This exact issue is not reproducible in master. This is because the code > which would previously trigger access on an unresolved object is [now > lazy|https://github.com/apache/spark/blob/7e5c3b216431b6a5e9a0786bf7cded694228cdee/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L132] > and hence does not trigger the issue. However the root cause is still > present in master, and some other codepath which depends on canonicalized > representations can trigger the same issue, although I haven't been able to > come up with a good example yet. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org