[jira] [Resolved] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion
[ https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kellan B Cummings resolved SPARK-37667. --- Resolution: Fixed > Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard > column expansion > --- > > Key: SPARK-37667 > URL: https://issues.apache.org/jira/browse/SPARK-37667 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2 >Reporter: Kellan B Cummings >Priority: Major > Fix For: 3.2.0 > > > I'm seeing a TreeNodeException ("Couldn't find {_}gen_alias{_}") when running > certain operations in Spark 3.1.2. > A few conditions need to be met to trigger the bug: > - a DF with a nested struct joins to a second DF > - a filter that compares a column in the right DF to a column in the left DF > - wildcard column expansion of the nested struct > - a group by statement on a struct column > *Data* > g...@github.com:kellanburket/spark3bug.git > > {code:java} > val rightDf = spark.read.parquet("right.parquet") > val leftDf = spark.read.parquet("left.parquet"){code} > > *Schemas* > {code:java} > leftDf.printSchema() > root > |-- row: struct (nullable = true) > | |-- mid: string (nullable = true) > | |-- start: struct (nullable = true) > | | |-- latitude: double (nullable = true) > | | |-- longitude: double (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > {code:java} > rightDf.printSchema() > root > |-- id: string (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > > *Breaking Code* > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > > *Working Examples* > The following examples don't seem to be effected by the bug > Works without group by: > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).show(){code} > Works without filter > {code:java} > leftDf.join(rightDf, "s2_cell_id").select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works without wildcard expansion > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.start"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works with caching > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).cache().select( > col("row.*"), > col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > *Error message* > > > {code:java} > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849] > +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) > null else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138]) > +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null > else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116 ASC NULLS FIRST], false, 0 > +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103] > +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], > Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false > :- BroadcastQueryStage 0 > : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, > bigint, false]),false), [id=#3768] > : +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, > s2_cell_id#2108L] > : +- *(1) Filter isnotnull(s2_cell_id#2108L) > : +- FileScan parquet [row#2107,s2_cell_id#2108L] > Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, > Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], > PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: > struct>,s2_cell_id:bigint> > +- *(2) Filter (isnotnull(id#2103) AND > isnotnull(s2_cell_id#2104L)) > +- *(2) ColumnarToRow > +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: > true, DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format: > Parquet, Location:
[jira] [Updated] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion
[ https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kellan B Cummings updated SPARK-37667: -- Fix Version/s: 3.2.0 > Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard > column expansion > --- > > Key: SPARK-37667 > URL: https://issues.apache.org/jira/browse/SPARK-37667 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2 >Reporter: Kellan B Cummings >Priority: Major > Fix For: 3.2.0 > > > I'm seeing a TreeNodeException ("Couldn't find {_}gen_alias{_}") when running > certain operations in Spark 3.1.2. > A few conditions need to be met to trigger the bug: > - a DF with a nested struct joins to a second DF > - a filter that compares a column in the right DF to a column in the left DF > - wildcard column expansion of the nested struct > - a group by statement on a struct column > *Data* > g...@github.com:kellanburket/spark3bug.git > > {code:java} > val rightDf = spark.read.parquet("right.parquet") > val leftDf = spark.read.parquet("left.parquet"){code} > > *Schemas* > {code:java} > leftDf.printSchema() > root > |-- row: struct (nullable = true) > | |-- mid: string (nullable = true) > | |-- start: struct (nullable = true) > | | |-- latitude: double (nullable = true) > | | |-- longitude: double (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > {code:java} > rightDf.printSchema() > root > |-- id: string (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > > *Breaking Code* > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > > *Working Examples* > The following examples don't seem to be effected by the bug > Works without group by: > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).show(){code} > Works without filter > {code:java} > leftDf.join(rightDf, "s2_cell_id").select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works without wildcard expansion > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.start"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works with caching > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).cache().select( > col("row.*"), > col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > *Error message* > > > {code:java} > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849] > +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) > null else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138]) > +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null > else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116 ASC NULLS FIRST], false, 0 > +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103] > +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], > Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false > :- BroadcastQueryStage 0 > : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, > bigint, false]),false), [id=#3768] > : +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, > s2_cell_id#2108L] > : +- *(1) Filter isnotnull(s2_cell_id#2108L) > : +- FileScan parquet [row#2107,s2_cell_id#2108L] > Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, > Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], > PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: > struct>,s2_cell_id:bigint> > +- *(2) Filter (isnotnull(id#2103) AND > isnotnull(s2_cell_id#2104L)) > +- *(2) ColumnarToRow > +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: > true, DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format: > Parquet, Location:
[jira] [Commented] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion
[ https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17462677#comment-17462677 ] Kellan B Cummings commented on SPARK-37667: --- Seems like it's working in Spark 3.2. So I guess nothing to do but wait for the EMR upgrade. Thanks! > Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard > column expansion > --- > > Key: SPARK-37667 > URL: https://issues.apache.org/jira/browse/SPARK-37667 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2 >Reporter: Kellan B Cummings >Priority: Major > > I'm seeing a TreeNodeException ("Couldn't find {_}gen_alias{_}") when running > certain operations in Spark 3.1.2. > A few conditions need to be met to trigger the bug: > - a DF with a nested struct joins to a second DF > - a filter that compares a column in the right DF to a column in the left DF > - wildcard column expansion of the nested struct > - a group by statement on a struct column > *Data* > g...@github.com:kellanburket/spark3bug.git > > {code:java} > val rightDf = spark.read.parquet("right.parquet") > val leftDf = spark.read.parquet("left.parquet"){code} > > *Schemas* > {code:java} > leftDf.printSchema() > root > |-- row: struct (nullable = true) > | |-- mid: string (nullable = true) > | |-- start: struct (nullable = true) > | | |-- latitude: double (nullable = true) > | | |-- longitude: double (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > {code:java} > rightDf.printSchema() > root > |-- id: string (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > > *Breaking Code* > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > > *Working Examples* > The following examples don't seem to be effected by the bug > Works without group by: > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).show(){code} > Works without filter > {code:java} > leftDf.join(rightDf, "s2_cell_id").select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works without wildcard expansion > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.start"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works with caching > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).cache().select( > col("row.*"), > col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > *Error message* > > > {code:java} > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849] > +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) > null else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138]) > +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null > else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116 ASC NULLS FIRST], false, 0 > +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103] > +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], > Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false > :- BroadcastQueryStage 0 > : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, > bigint, false]),false), [id=#3768] > : +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, > s2_cell_id#2108L] > : +- *(1) Filter isnotnull(s2_cell_id#2108L) > : +- FileScan parquet [row#2107,s2_cell_id#2108L] > Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, > Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], > PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: > struct>,s2_cell_id:bigint> > +- *(2) Filter (isnotnull(id#2103) AND > isnotnull(s2_cell_id#2104L)) > +- *(2) ColumnarToRow > +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched
[jira] [Updated] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion
[ https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kellan B Cummings updated SPARK-37667: -- Description: I'm seeing a TreeNodeException ("Couldn't find {_}gen_alias{_}") when running certain operations in Spark 3.1.2. A few conditions need to be met to trigger the bug: - a DF with a nested struct joins to a second DF - a filter that compares a column in the right DF to a column in the left DF - wildcard column expansion of the nested struct - a group by statement on a struct column *Data* g...@github.com:kellanburket/spark3bug.git {code:java} val rightDf = spark.read.parquet("right.parquet") val leftDf = spark.read.parquet("left.parquet"){code} *Schemas* {code:java} leftDf.printSchema() root |-- row: struct (nullable = true) | |-- mid: string (nullable = true) | |-- start: struct (nullable = true) | | |-- latitude: double (nullable = true) | | |-- longitude: double (nullable = true) |-- s2_cell_id: long (nullable = true){code} {code:java} rightDf.printSchema() root |-- id: string (nullable = true) |-- s2_cell_id: long (nullable = true){code} *Breaking Code* {code:java} leftDf.join(rightDf, "s2_cell_id").filter( "id != row.start.latitude" ).select( col("row.*"), col("id") ).groupBy( "start" ).agg( min("id") ).show(){code} *Working Examples* The following examples don't seem to be effected by the bug Works without group by: {code:java} leftDf.join(rightDf, "s2_cell_id").filter( "id != row.start.latitude" ).select( col("row.*"), col("id") ).show(){code} Works without filter {code:java} leftDf.join(rightDf, "s2_cell_id").select( col("row.*"), col("id") ).groupBy( "start" ).agg( min("id") ).show(){code} Works without wildcard expansion {code:java} leftDf.join(rightDf, "s2_cell_id").filter( "id != row.start.latitude" ).select( col("row.start"), col("id") ).groupBy( "start" ).agg( min("id") ).show(){code} Works with caching {code:java} leftDf.join(rightDf, "s2_cell_id").filter( "id != row.start.latitude" ).cache().select( col("row.*"), col("id") ).groupBy( "start" ).agg( min("id") ).show(){code} *Error message* {code:java} org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849] +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) null else named_struct(latitude, knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), longitude, knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138]) +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null else named_struct(latitude, knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), longitude, knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS start#2116 ASC NULLS FIRST], false, 0 +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103] +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false :- BroadcastQueryStage 0 : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#3768] : +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, s2_cell_id#2108L] : +- *(1) Filter isnotnull(s2_cell_id#2108L) : +- FileScan parquet [row#2107,s2_cell_id#2108L] Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: struct>,s2_cell_id:bigint> +- *(2) Filter (isnotnull(id#2103) AND isnotnull(s2_cell_id#2104L)) +- *(2) ColumnarToRow +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: true, DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format: Parquet, Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/right], PartitionFilters: [], PushedFilters: [IsNotNull(id), IsNotNull(s2_cell_id)], ReadSchema: struct at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$materializeFuture$1(ShuffleExchangeExec.scala:101) at org.apache.spark.sql.util.LazyValue.getOrInit(LazyValue.scala:41) at org.apache.spark.sql.execution.exchange.Exchange.getOrInitMaterializeFuture(Exchange.scala:71) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.materializeFuture(ShuffleExchangeExec.scala:97) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.materialize(ShuffleExchangeExec.scala:85)
[jira] [Updated] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion
[ https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kellan B Cummings updated SPARK-37667: -- Summary: Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion (was: Spark throws TreeNodeException during wildcard column expansion) > Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard > column expansion > --- > > Key: SPARK-37667 > URL: https://issues.apache.org/jira/browse/SPARK-37667 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2 >Reporter: Kellan B Cummings >Priority: Major > > I'm seeing a TreeNodeException ("Couldn't find _gen_alias_") when running > certain operations in Spark 3.1.2. > A few conditions need to be met to trigger the bug: > - a DF with a nested struct joins to a second DF > - a filter that compares a column in the right DF to a column in the left DF > - wildcard column expansion of the nested struct > - a group by statement on a struct column > *Data* > g...@github.com:kellanburket/spark3bug.git > > {code:java} > val rightDf = spark.read.parquet("right.parquet") > val leftDf = spark.read.parquet("left.parquet"){code} > > *Schemas* > {code:java} > leftDf.printSchema() > root > |-- row: struct (nullable = true) > | |-- mid: string (nullable = true) > | |-- start: struct (nullable = true) > | | |-- latitude: double (nullable = true) > | | |-- longitude: double (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > {code:java} > rightDf.printSchema() > root > |-- id: string (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > > *Breaking Code* > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > > *Working Examples* > The following examples don't seem to be effected by the bug > Works without group by: > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).show(){code} > Works without filter > {code:java} > leftDf.join(rightDf, "s2_cell_id").select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works without variable expansion > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.start"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works with caching > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).cache().select( > col("row.*"), > col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > *Error message* > > > {code:java} > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849] > +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) > null else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138]) > +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null > else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116 ASC NULLS FIRST], false, 0 > +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103] > +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], > Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false > :- BroadcastQueryStage 0 > : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, > bigint, false]),false), [id=#3768] > : +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, > s2_cell_id#2108L] > : +- *(1) Filter isnotnull(s2_cell_id#2108L) > : +- FileScan parquet [row#2107,s2_cell_id#2108L] > Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, > Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], > PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: > struct>,s2_cell_id:bigint> > +- *(2) Filter (isnotnull(id#2103) AND > isnotnull(s2_cell_id#2104L)) > +- *(2) ColumnarToRow > +- FileScan parquet [id#2103,s2_cell_id#210
[jira] [Updated] (SPARK-37667) Spark throws TreeNodeException during wildcard column expansion
[ https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kellan B Cummings updated SPARK-37667: -- Summary: Spark throws TreeNodeException during wildcard column expansion (was: Spark throws TreeNodeException during variable expansion) > Spark throws TreeNodeException during wildcard column expansion > --- > > Key: SPARK-37667 > URL: https://issues.apache.org/jira/browse/SPARK-37667 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.1.2 >Reporter: Kellan B Cummings >Priority: Major > > I'm seeing a TreeNodeException ("Couldn't find _gen_alias_") when running > certain operations in Spark 3.1.2. > A few conditions need to be met to trigger the bug: > - a DF with a nested struct joins to a second DF > - a filter that compares a column in the right DF to a column in the left DF > - wildcard column expansion of the nested struct > - a group by statement on a struct column > *Data* > g...@github.com:kellanburket/spark3bug.git > > {code:java} > val rightDf = spark.read.parquet("right.parquet") > val leftDf = spark.read.parquet("left.parquet"){code} > > *Schemas* > {code:java} > leftDf.printSchema() > root > |-- row: struct (nullable = true) > | |-- mid: string (nullable = true) > | |-- start: struct (nullable = true) > | | |-- latitude: double (nullable = true) > | | |-- longitude: double (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > {code:java} > rightDf.printSchema() > root > |-- id: string (nullable = true) > |-- s2_cell_id: long (nullable = true){code} > > *Breaking Code* > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > > *Working Examples* > The following examples don't seem to be effected by the bug > Works without group by: > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.*"), col("id") > ).show(){code} > Works without filter > {code:java} > leftDf.join(rightDf, "s2_cell_id").select( > col("row.*"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works without variable expansion > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).select( > col("row.start"), col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > Works with caching > {code:java} > leftDf.join(rightDf, "s2_cell_id").filter( > "id != row.start.latitude" > ).cache().select( > col("row.*"), > col("id") > ).groupBy( > "start" > ).agg( > min("id") > ).show(){code} > *Error message* > > > {code:java} > org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: > Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849] > +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) > null else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138]) > +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null > else named_struct(latitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), > longitude, > knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS > start#2116 ASC NULLS FIRST], false, 0 > +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103] > +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], > Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false > :- BroadcastQueryStage 0 > : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, > bigint, false]),false), [id=#3768] > : +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, > s2_cell_id#2108L] > : +- *(1) Filter isnotnull(s2_cell_id#2108L) > : +- FileScan parquet [row#2107,s2_cell_id#2108L] > Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, > Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], > PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: > struct>,s2_cell_id:bigint> > +- *(2) Filter (isnotnull(id#2103) AND > isnotnull(s2_cell_id#2104L)) > +- *(2) ColumnarToRow > +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: > true, DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format:
[jira] [Created] (SPARK-37667) Spark throws TreeNodeException during variable expansion
Kellan B Cummings created SPARK-37667: - Summary: Spark throws TreeNodeException during variable expansion Key: SPARK-37667 URL: https://issues.apache.org/jira/browse/SPARK-37667 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.1.2 Reporter: Kellan B Cummings I'm seeing a TreeNodeException ("Couldn't find _gen_alias_") when running certain operations in Spark 3.1.2. A few conditions need to be met to trigger the bug: - a DF with a nested struct joins to a second DF - a filter that compares a column in the right DF to a column in the left DF - wildcard column expansion of the nested struct - a group by statement on a struct column *Data* g...@github.com:kellanburket/spark3bug.git {code:java} val rightDf = spark.read.parquet("right.parquet") val leftDf = spark.read.parquet("left.parquet"){code} *Schemas* {code:java} leftDf.printSchema() root |-- row: struct (nullable = true) | |-- mid: string (nullable = true) | |-- start: struct (nullable = true) | | |-- latitude: double (nullable = true) | | |-- longitude: double (nullable = true) |-- s2_cell_id: long (nullable = true){code} {code:java} rightDf.printSchema() root |-- id: string (nullable = true) |-- s2_cell_id: long (nullable = true){code} *Breaking Code* {code:java} leftDf.join(rightDf, "s2_cell_id").filter( "id != row.start.latitude" ).select( col("row.*"), col("id") ).groupBy( "start" ).agg( min("id") ).show(){code} *Working Examples* The following examples don't seem to be effected by the bug Works without group by: {code:java} leftDf.join(rightDf, "s2_cell_id").filter( "id != row.start.latitude" ).select( col("row.*"), col("id") ).show(){code} Works without filter {code:java} leftDf.join(rightDf, "s2_cell_id").select( col("row.*"), col("id") ).groupBy( "start" ).agg( min("id") ).show(){code} Works without variable expansion {code:java} leftDf.join(rightDf, "s2_cell_id").filter( "id != row.start.latitude" ).select( col("row.start"), col("id") ).groupBy( "start" ).agg( min("id") ).show(){code} Works with caching {code:java} leftDf.join(rightDf, "s2_cell_id").filter( "id != row.start.latitude" ).cache().select( col("row.*"), col("id") ).groupBy( "start" ).agg( min("id") ).show(){code} *Error message* {code:java} org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849] +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) null else named_struct(latitude, knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), longitude, knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138]) +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null else named_struct(latitude, knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), longitude, knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS start#2116 ASC NULLS FIRST], false, 0 +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103] +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false :- BroadcastQueryStage 0 : +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#3768] : +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, s2_cell_id#2108L] : +- *(1) Filter isnotnull(s2_cell_id#2108L) : +- FileScan parquet [row#2107,s2_cell_id#2108L] Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: struct>,s2_cell_id:bigint> +- *(2) Filter (isnotnull(id#2103) AND isnotnull(s2_cell_id#2104L)) +- *(2) ColumnarToRow +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: true, DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format: Parquet, Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/right], PartitionFilters: [], PushedFilters: [IsNotNull(id), IsNotNull(s2_cell_id)], ReadSchema: struct at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56) at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$materializeFuture$1(ShuffleExchangeExec.scala:101) at org.apache.spark.sql.util.LazyValue.getOrInit(LazyValue.scala:41) at org.apache.spark.sql.execution.exchange.Exchange.getOrInitMaterializeFuture(Exchange.scala:71) at org.apache.spark.sql.execution.exc