[ https://issues.apache.org/jira/browse/SPARK-40706?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Bruce Robbins resolved SPARK-40706. ----------------------------------- Resolution: Duplicate > IllegalStateException when querying array values inside a nested struct > ----------------------------------------------------------------------- > > Key: SPARK-40706 > URL: https://issues.apache.org/jira/browse/SPARK-40706 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.2.0 > Reporter: Rohan Barman > Priority: Major > > We are in the process of migrating our PySpark applications from Spark > version 3.1.2 to Spark version 3.2.0. > This bug is present in version 3.2.0. We do not see this issue in version > 3.1.2. > > *Minimal example to reproduce bug* > Below is a minimal example that generates hardcoded data and queries. The > data has several nested structs and arrays. > Our real use case reads data from avro files and has more complex queries, > but this is sufficient to reproduce the error. > > {code:java} > # Generate data > data = [ > ('1',{ > 'timestamp': '09/07/2022', > 'message': 'm1', > 'data':{ > 'items': { > 'id':1, > 'attempt':[ > {'risk':[ > {'score':[1,2,3]}, > {'indicator':[ > {'code':'c1','value':'abc'}, > {'code':'c2','value':'def'} > ]} > ]} > ] > } > } > }) > ] > from pyspark.sql.types import * > schema = StructType([ > StructField('id', StringType(), True), > StructField('response', StructType([ > StructField('timestamp', StringType(), True), > StructField('message',StringType(), True), > StructField('data', StructType([ > StructField('items', StructType([ > StructField('id', StringType(), True), > StructField("attempt", ArrayType(StructType([ > StructField("risk", ArrayType(StructType([ > StructField('score', ArrayType(StringType()), True), > StructField('indicator', ArrayType(StructType([ > StructField('code', StringType(), True), > StructField('value', StringType(), True), > ]))) > ]))) > ]))) > ])) > ])) > ])), > ]) > df = spark.createDataFrame(data=data, schema=schema) > df.printSchema() > df.createOrReplaceTempView("tbl") > # Execute query > query = """ > SELECT > response.message as message, > response.timestamp as timestamp, > score as risk_score, > model.value as model_type > FROM tbl > LATERAL VIEW OUTER explode(response.data.items.attempt) > AS Attempt > LATERAL VIEW OUTER explode(response.data.items.attempt.risk) > AS RiskModels > LATERAL VIEW OUTER explode(RiskModels) > AS RiskModel > LATERAL VIEW OUTER explode(RiskModel.indicator) > AS Model > LATERAL VIEW OUTER explode(RiskModel.Score) > AS Score > """ > result = spark.sql(query) > print(result.count()) > print(result.head(10)) {code} > > *Post execution* > The above code thows an IllegalStateException. The entire error log is posted > at the end of this ticket. > {code:java} > java.lang.IllegalStateException: Couldn't find _extract_timestamp#44 in > [_extract_message#50,RiskModel#12]{code} > > The error seems to indicate that the _timestamp_ column is not available. > However we see _timestamp_ if we print the schema of the source dataframe. > {code:java} > # df.printSchema() > root > |-- id: string (nullable = true) > |-- response: struct (nullable = true) > | |-- timestamp: string (nullable = true) > | |-- message: string (nullable = true) > | |-- data: struct (nullable = true) > | | |-- items: struct (nullable = true) > | | | |-- id: string (nullable = true) > | | | |-- attempt: array (nullable = true) > | | | | |-- element: struct (containsNull = true) > | | | | | |-- risk: array (nullable = true) > | | | | | | |-- element: struct (containsNull = true) > | | | | | | | |-- score: array (nullable = true) > | | | | | | | | |-- element: string (containsNull = > true) > | | | | | | | |-- indicator: array (nullable = true) > | | | | | | | | |-- element: struct (containsNull = > true) > | | | | | | | | | |-- code: string (nullable = > true) > | | | | | | | | | |-- value: string (nullable = > true) {code} > *Extra observations* > We noticed that several query modifications can resolve the error. > 1) Error goes away if we also explicitly select all columns: > {code:java} > SELECT > *, > response.message as message, > response.timestamp as timestamp, > score as risk_score, > model.value as model_type > FROM tbl > LATERAL VIEW OUTER explode(response.data.items.attempt) > AS Attempt > LATERAL VIEW OUTER explode(response.data.items.attempt.risk) > AS RiskModels > LATERAL VIEW OUTER explode(RiskModels) > AS RiskModel > LATERAL VIEW OUTER explode(RiskModel.indicator) > AS Model > LATERAL VIEW OUTER explode(RiskModel.Score) > AS Score {code} > > 2) Error goes away if we only query from one of the nested arrays > {code:java} > SELECT > response.message as message, > response.timestamp as timestamp, > --score as risk_score, > model.value as model_type > FROM tbl > LATERAL VIEW OUTER explode(response.data.items.attempt) > AS Attempt > LATERAL VIEW OUTER explode(response.data.items.attempt.risk) > AS RiskModels > LATERAL VIEW OUTER explode(RiskModels) > AS RiskModel > LATERAL VIEW OUTER explode(RiskModel.indicator) > AS Model > --LATERAL VIEW OUTER explode(RiskModel.Score) > AS Score {code} > > We also noticed that the IllegalStateException refers to the second column in > the SELECT query. > For example if _timestamp_ is second, the exception refers to _timestamp_ > {code:java} > SELECT > response.message as message, > response.timestamp as timestamp, > ..... {code} > {code:java} > java.lang.IllegalStateException: Couldn't find _extract_timestamp#53 in > [_extract_message#59,RiskModel#21]{code} > If we swap the order so that _message_ is second, the exception refers to > _message._ > {code:java} > SELECT > response.timestamp as timestamp, > response.message as message, > ..... {code} > {code:java} > java.lang.IllegalStateException: Couldn't find _extract_message#53 in > [_extract_timestamp#59,RiskModel#21]{code} > > *---* > Are there any spark settings that can resolve this? Or an alternate way to > query deeply nested structs? > > *Full error:* > > {code:java} > py4j.protocol.Py4JJavaError: An error occurred while calling > o40.collectToPython. > : java.lang.IllegalStateException: Couldn't find _extract_timestamp#44 in > [_extract_message#50,RiskModel#12] > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73) > at > org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) > at > org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) > at > org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:528) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:73) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$.$anonfun$bindReferences$1(BoundAttribute.scala:94) > at scala.collection.immutable.List.map(List.scala:297) > at > org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReferences(BoundAttribute.scala:94) > at > org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69) > at > org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:196) > at > org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151) > at > org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:58) > at > org.apache.spark.sql.execution.GenerateExec.codeGenCollection(GenerateExec.scala:232) > at > org.apache.spark.sql.execution.GenerateExec.doConsume(GenerateExec.scala:145) > at > org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:223) > at > org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194) > at > org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151) > at > org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:58) > at > org.apache.spark.sql.execution.GenerateExec.codeGenCollection(GenerateExec.scala:232) > at > org.apache.spark.sql.execution.GenerateExec.doConsume(GenerateExec.scala:145) > at > org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:223) > at > org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194) > at > org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151) > at > org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:42) > at > org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:89) > at > org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:196) > at > org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151) > at > org.apache.spark.sql.execution.GenerateExec.consume(GenerateExec.scala:58) > at > org.apache.spark.sql.execution.GenerateExec.codeGenCollection(GenerateExec.scala:232) > at > org.apache.spark.sql.execution.GenerateExec.doConsume(GenerateExec.scala:145) > at > org.apache.spark.sql.execution.CodegenSupport.constructDoConsumeFunction(WholeStageCodegenExec.scala:223) > at > org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:194) > at > org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151) > at > org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:42) > at > org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:89) > at > org.apache.spark.sql.execution.CodegenSupport.consume(WholeStageCodegenExec.scala:196) > at > org.apache.spark.sql.execution.CodegenSupport.consume$(WholeStageCodegenExec.scala:151) > at > org.apache.spark.sql.execution.RDDScanExec.consume(ExistingRDD.scala:132) > at > org.apache.spark.sql.execution.InputRDDCodegen.doProduce(WholeStageCodegenExec.scala:485) > at > org.apache.spark.sql.execution.InputRDDCodegen.doProduce$(WholeStageCodegenExec.scala:458) > at > org.apache.spark.sql.execution.RDDScanExec.doProduce(ExistingRDD.scala:132) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.RDDScanExec.produce(ExistingRDD.scala:132) > at > org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:42) > at > org.apache.spark.sql.execution.GenerateExec.doProduce(GenerateExec.scala:134) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.GenerateExec.produce(GenerateExec.scala:58) > at > org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:42) > at > org.apache.spark.sql.execution.GenerateExec.doProduce(GenerateExec.scala:134) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.GenerateExec.produce(GenerateExec.scala:58) > at > org.apache.spark.sql.execution.GenerateExec.doProduce(GenerateExec.scala:134) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.GenerateExec.produce(GenerateExec.scala:58) > at > org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:42) > at > org.apache.spark.sql.execution.GenerateExec.doProduce(GenerateExec.scala:134) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.GenerateExec.produce(GenerateExec.scala:58) > at > org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:42) > at > org.apache.spark.sql.execution.GenerateExec.doProduce(GenerateExec.scala:134) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.GenerateExec.produce(GenerateExec.scala:58) > at > org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:55) > at > org.apache.spark.sql.execution.CodegenSupport.$anonfun$produce$1(WholeStageCodegenExec.scala:97) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at > org.apache.spark.sql.execution.CodegenSupport.produce(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.CodegenSupport.produce$(WholeStageCodegenExec.scala:92) > at > org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:42) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:660) > at > org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:194) > at > org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:232) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:229) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:190) > at > org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:340) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:473) > at > org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459) > at > org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48) > at > org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3688) > at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858) > at > org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510) > at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) > at > org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) > at > org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) > at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) > at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856) > at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at py4j.Gateway.invoke(Gateway.java:282) > at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at py4j.commands.CallCommand.execute(CallCommand.java:79) > at > py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) > at py4j.ClientServerConnection.run(ClientServerConnection.java:106) > at java.lang.Thread.run(Thread.java:748){code} > > > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org