Shardul Mahadik created SPARK-38030:
---------------------------------------

             Summary: Query with cast containing non-nullable columns fails 
with AQE on Spark 3.1.1
                 Key: SPARK-38030
                 URL: https://issues.apache.org/jira/browse/SPARK-38030
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.1.1
            Reporter: Shardul Mahadik


One of our user queries failed in Spark 3.1.1 when using AQE with the following 
stacktrace mentioned below (some parts of the plan have been redacted, but the 
structure is preserved).

Debugging this issue, we found that the failure was within AQE calling 
[QueryPlan.canonicalized|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L402].

The query contains a cast over a column with non-nullable struct fields. 
Canonicalization [removes nullability 
information|https://github.com/apache/spark/blob/91db9a36a9ed74845908f14d21227d5267591653/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala#L45]
 from the child {{AttributeReference}} of the Cast, however it does not remove 
nullability information from the Cast's target dataType. This causes the 
[checkInputDataTypes|https://github.com/apache/spark/blob/branch-3.1/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala#L290]
 to return false because the child is now nullable and cast target data type is 
not, leading to {{resolved=false}} and hence the {{UnresolvedException}}.
{code:java}
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: makeCopy, tree:
Exchange RoundRobinPartitioning(1), REPARTITION_BY_NUM, [id=#232]
+- Union
   :- Project [cast(columnA#30) as struct<...>]
   :  +- BatchScan[columnA#30] hive.tbl 
   +- Project [cast(columnA#35) as struct<...>]
      +- BatchScan[columnA#35] hive.tbl

  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:475)
  at org.apache.spark.sql.catalyst.trees.TreeNode.makeCopy(TreeNode.scala:464)
  at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:87)
  at org.apache.spark.sql.execution.SparkPlan.makeCopy(SparkPlan.scala:58)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:301)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:405)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:373)
  at 
org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:372)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:404)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$createQueryStages$2(AdaptiveSparkPlanExec.scala:447)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:447)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:184)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:179)
  at 
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:279)
  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
  at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
  at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
  at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:825)
  at org.apache.spark.sql.Dataset.show(Dataset.scala:784)
  ... 85 elided
Caused by: java.lang.reflect.InvocationTargetException: 
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
nullable on unresolved object, tree: '
  at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
  at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
  at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$7(TreeNode.scala:508)
  at 
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:77)
  at 
org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$makeCopy$1(TreeNode.scala:507)
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
  ... 127 more
Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to nullable on unresolved object, tree: '
  at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.nullable(unresolved.scala:150)
  at 
org.apache.spark.sql.execution.UnionExec.$anonfun$output$5(basicPhysicalOperators.scala:655)
  at 
org.apache.spark.sql.execution.UnionExec.$anonfun$output$5$adapted(basicPhysicalOperators.scala:655)
  at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95)
  at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92)
  at scala.collection.immutable.List.exists(List.scala:89)
  at 
org.apache.spark.sql.execution.UnionExec.$anonfun$output$4(basicPhysicalOperators.scala:655)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at scala.collection.TraversableLike.map(TraversableLike.scala:238)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
  at scala.collection.immutable.List.map(List.scala:298)
  at 
org.apache.spark.sql.execution.UnionExec.output(basicPhysicalOperators.scala:653)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.<init>(ShuffleExchangeExec.scala:121)
  ... 135 more
{code}
Repro in Spark 3.1.1
{code:java}
  test("SPARK-XXXXX: Repro: Query with cast containing non-nullable columns 
fails with AQE") {
    import scala.collection.JavaConverters._
    withSQLConf(
      SQLConf.OPTIMIZER_EXCLUDED_RULES.key ->
        (ConvertToLocalRelation.ruleName + "," + 
PropagateEmptyRelation.ruleName),
      SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
      val nameType = StructType(Seq(StructField("firstName", StringType, 
nullable = false)))
      val schema = StructType(Seq(StructField("name", nameType, nullable = 
false)))
      // change column name from firstName to fname, should be irrelevant for 
Cast
      val newNameType = StructType(Seq(StructField("fname", StringType, 
nullable = false)))

      val df = spark.createDataFrame(List.empty[Row].asJava, schema)
      val df1 = df.withColumn("newName", 'name.cast(newNameType))
      val df2 = df1.union(df1).repartition(1)
      df2.show()
    }
  }
{code}

I believe that during canonicalization, the nullability hints should also be 
removed from the target data type of {{Cast}} so that they match with the 
child's canonicalized representation.

--------

This exact issue is not reproducible in master. This is because the code which 
would previously trigger access on an unresolved object is [now 
lazy|https://github.com/apache/spark/blob/7e5c3b216431b6a5e9a0786bf7cded694228cdee/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L132]
 and hence does not trigger the issue. However the root cause is still present 
in master, and some other codepath which depends on canonicalized 
representations can trigger the same issue, although I haven't been able to 
come up with a good example yet.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to