[ 
https://issues.apache.org/jira/browse/SPARK-39854?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17965484#comment-17965484
 ] 

Juha Iso-Sipilä commented on SPARK-39854:
-----------------------------------------

I've hit this issue in a query the involves explode, complex nested structures, 
filtering lists within, etc. I've written the Scala examples as pytest which 
confirms this problem on 3.5.6 and 4.0.0. Disabling the recommended 
optimisations is a workaround.
{code:java}
from pyspark import Row
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, struct, col


def test_spark_issue_39854():
    spark_session = (
        SparkSession.builder.config(
            "spark.sql.optimizer.expression.nestedPruning.enabled", "true"
        )
        .config("spark.sql.optimizer.nestedSchemaPruning.enabled", "true")
        .getOrCreate()
    )
    data = Row(
        b=Row(
            id="id00",
            data=[
                Row(
                    b1="vb1",
                    b2=101,
                    ex2=[
                        Row(fb1=False, fb2=11, fb3="t1"),
                        Row(fb1=True, fb2=12, fb3="t2"),
                    ],
                ),
                Row(
                    b1="vb2",
                    b2=102,
                    ex2=[
                        Row(fb1=False, fb2=13, fb3="t3"),
                        Row(fb1=True, fb2=14, fb3="t4"),
                    ],
                ),
            ],
            fa="tes",
            v="1.5",
        )
    )
    df = spark_session.createDataFrame([data])
    df.printSchema()
    df.show()
    df2 = df.withColumn("ex_b", explode("b.data.ex2")).withColumn(
        "ex_b2", explode("ex_b")
    )
    df2.printSchema()
    df2.show()
    df3 = df2.withColumn(
        "rt", struct(col("b.fa").alias("rt_fa"), col("b.v").alias("rt_v"))
    ).drop("b", "ex_b")
    df3.printSchema()
    df3.show()  # org.apache.spark.SparkException: [INTERNAL_ERROR] Couldn't 
find _extract_v#28 in [_extract_fa#29,ex_b2#9] SQLSTATE: XX000{code}
I managed to rewrite my query to avoid this issue but wasn't ideal and I don't 
know why a rewrite didn't trigger it. I am afraid I can't easily share the real 
example.

> Catalyst 'ColumnPruning' Optimizer does not play well with sql function 
> 'explode'
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-39854
>                 URL: https://issues.apache.org/jira/browse/SPARK-39854
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0, 3.2.1, 3.3.0, 3.2.2, 3.4.0
>         Environment: Spark version: the latest (3.4.0-SNAPSHOT)
> OS: Ubuntu 20.04
> JDK: Amazon corretto-11.0.14.1
>            Reporter: Jiaji Wu
>            Priority: Critical
>              Labels: pull-request-available
>
> The *ColumnPruning* optimizer batch does not always work with *explode* sql 
> function.
>  * Here's a code snippet to repro the issue:
>  
> {code:java}
> import spark.implicits._
> val testJson =
>   """{
>     | "b": {
>     |  "id": "id00",
>     |  "data": [{
>     |   "b1": "vb1",
>     |   "b2": 101,
>     |   "ex2": [
>     |    { "fb1": false, "fb2": 11, "fb3": "t1" },
>     |    { "fb1": true, "fb2": 12, "fb3": "t2" }
>     |   ]}, {
>     |   "b1": "vb2",
>     |   "b2": 102,
>     |   "ex2": [
>     |    { "fb1": false, "fb2": 13, "fb3": "t3" },
>     |    { "fb1": true, "fb2": 14, "fb3": "t4" }
>     |   ]}
>     |  ],
>     |  "fa": "tes",
>     |  "v": "1.5"
>     | }
>     |}
>     |""".stripMargin
> val df = spark.read.json((testJson :: Nil).toDS())
>   .withColumn("ex_b", explode($"b.data.ex2"))
>   .withColumn("ex_b2", explode($"ex_b"))
> val df1 = df
>   .withColumn("rt", struct(
>     $"b.fa".alias("rt_fa"),
>     $"b.v".alias("rt_v")
>   ))
>   .drop("b", "ex_b")
> df1.show(false){code}
>  * the result exception:
> {code:java}
> Exception in thread "main" java.lang.IllegalStateException: Couldn't find 
> _extract_v#35 in [_extract_fa#36,ex_b2#13]
>     at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
>     at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
>     at 
> org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
>     at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:698)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:589)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
>     at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1196)
>     at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1195)
>     at 
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:513)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:589)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:589)
>     at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1196)
>     at 
> org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1195)
>     at 
> org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:513)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:589)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
>     at 
> org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:528)
>     at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:73)
>     at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:94)
>     at scala.collection.immutable.List.map(List.scala:297)
>     at 
> org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReferences(BoundAttribute.scala:94)
>     at 
> org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:196)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151)
>     at 
> org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:58)
>     at 
> org.apache.spark.sql.execution.GenerateExec.codeGenCollection(GenerateExec.scala:232)
>     at 
> org.apache.spark.sql.execution.GenerateExec.doConsume(GenerateExec.scala:145)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:223)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151)
>     at 
> org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:216)
>     at 
> org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:265)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:196)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151)
>     at 
> org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:58)
>     at 
> org.apache.spark.sql.execution.GenerateExec.codeGenCollection(GenerateExec.scala:232)
>     at 
> org.apache.spark.sql.execution.GenerateExec.doConsume(GenerateExec.scala:145)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:223)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151)
>     at 
> org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:42)
>     at 
> org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:89)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:223)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151)
>     at 
> org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:216)
>     at 
> org.apache.spark.sql.execution.FilterExec.doConsume(basicPhysicalOperators.scala:265)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:196)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151)
>     at 
> org.apache.spark.sql.execution.RDDScanExec.consume(ExistingRDD.scala:153)
>     at 
> org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:485)
>     at 
> org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:458)
>     at 
> org.apache.spark.sql.execution.RDDScanExec.doProduce(ExistingRDD.scala:153)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.RDDScanExec.produce(ExistingRDD.scala:153)
>     at 
> org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:242)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:216)
>     at 
> org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:42)
>     at 
> org.apache.spark.sql.execution.GenerateExec.doProduce(GenerateExec.scala:134)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.GenerateExec.produce(GenerateExec.scala:58)
>     at 
> org.apache.spark.sql.execution.FilterExec.doProduce(basicPhysicalOperators.scala:242)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.FilterExec.produce(basicPhysicalOperators.scala:216)
>     at 
> org.apache.spark.sql.execution.GenerateExec.doProduce(GenerateExec.scala:134)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.GenerateExec.produce(GenerateExec.scala:58)
>     at 
> org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92)
>     at 
> org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:42)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:660)
>     at 
> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194)
>     at 
> org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232)
>     at 
> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229)
>     at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190)
>     at 
> org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:340)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:473)
>     at 
> org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
>     at 
> org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
>     at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3960)
>     at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2955)
>     at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3950)
>     at 
> org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512)
>     at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3948)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:111)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:171)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
>     at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
>     at 
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
>     at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3948)
>     at org.apache.spark.sql.Dataset.head(Dataset.scala:2955)
>     at org.apache.spark.sql.Dataset.take(Dataset.scala:3176)
>     at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
>     at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
>     at org.apache.spark.sql.Dataset.show(Dataset.scala:834)
>     at org.apache.spark.sql.Dataset.show(Dataset.scala:811)
>     at 
> org.apache.spark.opticloud.replaceWithAliases_Issue$.main(replaceWithAliases_Issue.scala:70)
>     at 
> org.apache.spark.opticloud.replaceWithAliases_Issue.main(replaceWithAliases_Issue.scala)
>  {code}
>  
>  
> Note: this issue is initially reported to the 
> [spark-xml|https://github.com/databricks/spark-xml/issues/580] repo. However, 
> it turns out be an issue within catalyst optimizer (the snippet above does 
> not has dependency on *spark-xml* ).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to