[jira] [Resolved] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion

2021-12-20 Thread Kellan B Cummings (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kellan B Cummings resolved SPARK-37667.
---
Resolution: Fixed

> Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard 
> column expansion
> ---
>
> Key: SPARK-37667
> URL: https://issues.apache.org/jira/browse/SPARK-37667
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: Kellan B Cummings
>Priority: Major
> Fix For: 3.2.0
>
>
> I'm seeing a TreeNodeException ("Couldn't find {_}gen_alias{_}") when running 
> certain operations in Spark 3.1.2.
> A few conditions need to be met to trigger the bug:
>  - a DF with a nested struct joins to a second DF
>  - a filter that compares a column in the right DF to a column in the left DF
>  - wildcard column expansion of the nested struct
>  - a group by statement on a struct column
> *Data*
> g...@github.com:kellanburket/spark3bug.git
>  
> {code:java}
> val rightDf = spark.read.parquet("right.parquet")
> val leftDf = spark.read.parquet("left.parquet"){code}
>  
> *Schemas*
> {code:java}
> leftDf.printSchema()
> root
>  |-- row: struct (nullable = true)
>  |    |-- mid: string (nullable = true)
>  |    |-- start: struct (nullable = true)
>  |    |    |-- latitude: double (nullable = true)
>  |    |    |-- longitude: double (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
> {code:java}
> rightDf.printSchema()
> root
>  |-- id: string (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
>  
> *Breaking Code*
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
>  
> *Working Examples*
> The following examples don't seem to be effected by the bug
> Works without group by:
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).show(){code}
> Works without filter
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works without wildcard expansion
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.start"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works with caching
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).cache().select(
>    col("row.*"),
>    col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> *Error message*
>  
>  
> {code:java}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849]
> +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) 
> null else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138])
>    +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null 
> else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116 ASC NULLS FIRST], false, 0
>       +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103]
>          +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], 
> Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false
>             :- BroadcastQueryStage 0
>             :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, 
> bigint, false]),false), [id=#3768]
>             :     +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, 
> s2_cell_id#2108L]
>             :        +- *(1) Filter isnotnull(s2_cell_id#2108L)
>             :           +- FileScan parquet [row#2107,s2_cell_id#2108L] 
> Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, 
> Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], 
> PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: 
> struct>,s2_cell_id:bigint>
>             +- *(2) Filter (isnotnull(id#2103) AND 
> isnotnull(s2_cell_id#2104L))
>                +- *(2) ColumnarToRow
>                   +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: 
> true, DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format: 
> Parquet, Location: 

[jira] [Updated] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion

2021-12-20 Thread Kellan B Cummings (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kellan B Cummings updated SPARK-37667:
--
Fix Version/s: 3.2.0

> Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard 
> column expansion
> ---
>
> Key: SPARK-37667
> URL: https://issues.apache.org/jira/browse/SPARK-37667
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: Kellan B Cummings
>Priority: Major
> Fix For: 3.2.0
>
>
> I'm seeing a TreeNodeException ("Couldn't find {_}gen_alias{_}") when running 
> certain operations in Spark 3.1.2.
> A few conditions need to be met to trigger the bug:
>  - a DF with a nested struct joins to a second DF
>  - a filter that compares a column in the right DF to a column in the left DF
>  - wildcard column expansion of the nested struct
>  - a group by statement on a struct column
> *Data*
> g...@github.com:kellanburket/spark3bug.git
>  
> {code:java}
> val rightDf = spark.read.parquet("right.parquet")
> val leftDf = spark.read.parquet("left.parquet"){code}
>  
> *Schemas*
> {code:java}
> leftDf.printSchema()
> root
>  |-- row: struct (nullable = true)
>  |    |-- mid: string (nullable = true)
>  |    |-- start: struct (nullable = true)
>  |    |    |-- latitude: double (nullable = true)
>  |    |    |-- longitude: double (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
> {code:java}
> rightDf.printSchema()
> root
>  |-- id: string (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
>  
> *Breaking Code*
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
>  
> *Working Examples*
> The following examples don't seem to be effected by the bug
> Works without group by:
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).show(){code}
> Works without filter
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works without wildcard expansion
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.start"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works with caching
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).cache().select(
>    col("row.*"),
>    col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> *Error message*
>  
>  
> {code:java}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849]
> +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) 
> null else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138])
>    +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null 
> else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116 ASC NULLS FIRST], false, 0
>       +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103]
>          +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], 
> Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false
>             :- BroadcastQueryStage 0
>             :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, 
> bigint, false]),false), [id=#3768]
>             :     +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, 
> s2_cell_id#2108L]
>             :        +- *(1) Filter isnotnull(s2_cell_id#2108L)
>             :           +- FileScan parquet [row#2107,s2_cell_id#2108L] 
> Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, 
> Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], 
> PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: 
> struct>,s2_cell_id:bigint>
>             +- *(2) Filter (isnotnull(id#2103) AND 
> isnotnull(s2_cell_id#2104L))
>                +- *(2) ColumnarToRow
>                   +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: 
> true, DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format: 
> Parquet, Location:

[jira] [Commented] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion

2021-12-20 Thread Kellan B Cummings (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17462677#comment-17462677
 ] 

Kellan B Cummings commented on SPARK-37667:
---

Seems like it's working in Spark 3.2. So I guess nothing to do but wait for the 
EMR upgrade. Thanks!

> Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard 
> column expansion
> ---
>
> Key: SPARK-37667
> URL: https://issues.apache.org/jira/browse/SPARK-37667
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: Kellan B Cummings
>Priority: Major
>
> I'm seeing a TreeNodeException ("Couldn't find {_}gen_alias{_}") when running 
> certain operations in Spark 3.1.2.
> A few conditions need to be met to trigger the bug:
>  - a DF with a nested struct joins to a second DF
>  - a filter that compares a column in the right DF to a column in the left DF
>  - wildcard column expansion of the nested struct
>  - a group by statement on a struct column
> *Data*
> g...@github.com:kellanburket/spark3bug.git
>  
> {code:java}
> val rightDf = spark.read.parquet("right.parquet")
> val leftDf = spark.read.parquet("left.parquet"){code}
>  
> *Schemas*
> {code:java}
> leftDf.printSchema()
> root
>  |-- row: struct (nullable = true)
>  |    |-- mid: string (nullable = true)
>  |    |-- start: struct (nullable = true)
>  |    |    |-- latitude: double (nullable = true)
>  |    |    |-- longitude: double (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
> {code:java}
> rightDf.printSchema()
> root
>  |-- id: string (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
>  
> *Breaking Code*
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
>  
> *Working Examples*
> The following examples don't seem to be effected by the bug
> Works without group by:
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).show(){code}
> Works without filter
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works without wildcard expansion
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.start"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works with caching
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).cache().select(
>    col("row.*"),
>    col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> *Error message*
>  
>  
> {code:java}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849]
> +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) 
> null else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138])
>    +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null 
> else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116 ASC NULLS FIRST], false, 0
>       +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103]
>          +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], 
> Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false
>             :- BroadcastQueryStage 0
>             :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, 
> bigint, false]),false), [id=#3768]
>             :     +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, 
> s2_cell_id#2108L]
>             :        +- *(1) Filter isnotnull(s2_cell_id#2108L)
>             :           +- FileScan parquet [row#2107,s2_cell_id#2108L] 
> Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, 
> Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], 
> PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: 
> struct>,s2_cell_id:bigint>
>             +- *(2) Filter (isnotnull(id#2103) AND 
> isnotnull(s2_cell_id#2104L))
>                +- *(2) ColumnarToRow
>                   +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched

[jira] [Updated] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion

2021-12-16 Thread Kellan B Cummings (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kellan B Cummings updated SPARK-37667:
--
Description: 
I'm seeing a TreeNodeException ("Couldn't find {_}gen_alias{_}") when running 
certain operations in Spark 3.1.2.

A few conditions need to be met to trigger the bug:
 - a DF with a nested struct joins to a second DF
 - a filter that compares a column in the right DF to a column in the left DF
 - wildcard column expansion of the nested struct
 - a group by statement on a struct column

*Data*
g...@github.com:kellanburket/spark3bug.git

 
{code:java}
val rightDf = spark.read.parquet("right.parquet")
val leftDf = spark.read.parquet("left.parquet"){code}
 

*Schemas*
{code:java}
leftDf.printSchema()
root
 |-- row: struct (nullable = true)
 |    |-- mid: string (nullable = true)
 |    |-- start: struct (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |-- s2_cell_id: long (nullable = true){code}
{code:java}
rightDf.printSchema()
root
 |-- id: string (nullable = true)
 |-- s2_cell_id: long (nullable = true){code}
 

*Breaking Code*
{code:java}
leftDf.join(rightDf, "s2_cell_id").filter(
    "id != row.start.latitude"
).select(
   col("row.*"), col("id")
).groupBy(
    "start"
).agg(
   min("id")
).show(){code}
 

*Working Examples*

The following examples don't seem to be effected by the bug

Works without group by:
{code:java}
leftDf.join(rightDf, "s2_cell_id").filter(
    "id != row.start.latitude"
).select(
   col("row.*"), col("id")
).show(){code}
Works without filter
{code:java}
leftDf.join(rightDf, "s2_cell_id").select(
   col("row.*"), col("id")
).groupBy(
    "start"
).agg(
   min("id")
).show(){code}
Works without wildcard expansion
{code:java}
leftDf.join(rightDf, "s2_cell_id").filter(
    "id != row.start.latitude"
).select(
   col("row.start"), col("id")
).groupBy(
    "start"
).agg(
   min("id")
).show(){code}
Works with caching
{code:java}
leftDf.join(rightDf, "s2_cell_id").filter(
    "id != row.start.latitude"
).cache().select(
   col("row.*"),
   col("id")
).groupBy(
    "start"
).agg(
   min("id")
).show(){code}
*Error message*

 

 
{code:java}
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849]
+- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) null 
else named_struct(latitude, 
knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
longitude, 
knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138])
   +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null else 
named_struct(latitude, 
knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
longitude, 
knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
start#2116 ASC NULLS FIRST], false, 0
      +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103]
         +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], 
Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false
            :- BroadcastQueryStage 0
            :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, 
bigint, false]),false), [id=#3768]
            :     +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, 
s2_cell_id#2108L]
            :        +- *(1) Filter isnotnull(s2_cell_id#2108L)
            :           +- FileScan parquet [row#2107,s2_cell_id#2108L] 
Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, 
Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], 
PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: 
struct>,s2_cell_id:bigint>
            +- *(2) Filter (isnotnull(id#2103) AND isnotnull(s2_cell_id#2104L))
               +- *(2) ColumnarToRow
                  +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: true, 
DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format: 
Parquet, Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/right], 
PartitionFilters: [], PushedFilters: [IsNotNull(id), IsNotNull(s2_cell_id)], 
ReadSchema: struct
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$materializeFuture$1(ShuffleExchangeExec.scala:101)
  at org.apache.spark.sql.util.LazyValue.getOrInit(LazyValue.scala:41)
  at 
org.apache.spark.sql.execution.exchange.Exchange.getOrInitMaterializeFuture(Exchange.scala:71)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.materializeFuture(ShuffleExchangeExec.scala:97)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.materialize(ShuffleExchangeExec.scala:85)
 

[jira] [Updated] (SPARK-37667) Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard column expansion

2021-12-16 Thread Kellan B Cummings (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kellan B Cummings updated SPARK-37667:
--
Summary: Spark throws TreeNodeException ("Couldn't find gen_alias") during 
wildcard column expansion  (was: Spark throws TreeNodeException during wildcard 
column expansion)

> Spark throws TreeNodeException ("Couldn't find gen_alias") during wildcard 
> column expansion
> ---
>
> Key: SPARK-37667
> URL: https://issues.apache.org/jira/browse/SPARK-37667
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: Kellan B Cummings
>Priority: Major
>
> I'm seeing a TreeNodeException ("Couldn't find _gen_alias_") when running 
> certain operations in Spark 3.1.2.
> A few conditions need to be met to trigger the bug:
> - a DF with a nested struct joins to a second DF
> - a filter that compares a column in the right DF to a column in the left DF
> - wildcard column expansion of the nested struct
> - a group by statement on a struct column
> *Data*
> g...@github.com:kellanburket/spark3bug.git
>  
> {code:java}
> val rightDf = spark.read.parquet("right.parquet")
> val leftDf = spark.read.parquet("left.parquet"){code}
>  
> *Schemas*
> {code:java}
> leftDf.printSchema()
> root
>  |-- row: struct (nullable = true)
>  |    |-- mid: string (nullable = true)
>  |    |-- start: struct (nullable = true)
>  |    |    |-- latitude: double (nullable = true)
>  |    |    |-- longitude: double (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
> {code:java}
> rightDf.printSchema()
> root
>  |-- id: string (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
>  
> *Breaking Code*
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
>  
> *Working Examples*
> The following examples don't seem to be effected by the bug
> Works without group by:
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).show(){code}
> Works without filter
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works without variable expansion
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.start"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works with caching
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).cache().select(
>    col("row.*"),
>    col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> *Error message*
>  
>  
> {code:java}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849]
> +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) 
> null else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138])
>    +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null 
> else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116 ASC NULLS FIRST], false, 0
>       +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103]
>          +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], 
> Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false
>             :- BroadcastQueryStage 0
>             :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, 
> bigint, false]),false), [id=#3768]
>             :     +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, 
> s2_cell_id#2108L]
>             :        +- *(1) Filter isnotnull(s2_cell_id#2108L)
>             :           +- FileScan parquet [row#2107,s2_cell_id#2108L] 
> Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, 
> Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], 
> PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: 
> struct>,s2_cell_id:bigint>
>             +- *(2) Filter (isnotnull(id#2103) AND 
> isnotnull(s2_cell_id#2104L))
>                +- *(2) ColumnarToRow
>                   +- FileScan parquet [id#2103,s2_cell_id#210

[jira] [Updated] (SPARK-37667) Spark throws TreeNodeException during wildcard column expansion

2021-12-16 Thread Kellan B Cummings (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-37667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kellan B Cummings updated SPARK-37667:
--
Summary: Spark throws TreeNodeException during wildcard column expansion  
(was: Spark throws TreeNodeException during variable expansion)

> Spark throws TreeNodeException during wildcard column expansion
> ---
>
> Key: SPARK-37667
> URL: https://issues.apache.org/jira/browse/SPARK-37667
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.1.2
>Reporter: Kellan B Cummings
>Priority: Major
>
> I'm seeing a TreeNodeException ("Couldn't find _gen_alias_") when running 
> certain operations in Spark 3.1.2.
> A few conditions need to be met to trigger the bug:
> - a DF with a nested struct joins to a second DF
> - a filter that compares a column in the right DF to a column in the left DF
> - wildcard column expansion of the nested struct
> - a group by statement on a struct column
> *Data*
> g...@github.com:kellanburket/spark3bug.git
>  
> {code:java}
> val rightDf = spark.read.parquet("right.parquet")
> val leftDf = spark.read.parquet("left.parquet"){code}
>  
> *Schemas*
> {code:java}
> leftDf.printSchema()
> root
>  |-- row: struct (nullable = true)
>  |    |-- mid: string (nullable = true)
>  |    |-- start: struct (nullable = true)
>  |    |    |-- latitude: double (nullable = true)
>  |    |    |-- longitude: double (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
> {code:java}
> rightDf.printSchema()
> root
>  |-- id: string (nullable = true)
>  |-- s2_cell_id: long (nullable = true){code}
>  
> *Breaking Code*
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
>  
> *Working Examples*
> The following examples don't seem to be effected by the bug
> Works without group by:
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.*"), col("id")
> ).show(){code}
> Works without filter
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").select(
>    col("row.*"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works without variable expansion
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).select(
>    col("row.start"), col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> Works with caching
> {code:java}
> leftDf.join(rightDf, "s2_cell_id").filter(
>     "id != row.start.latitude"
> ).cache().select(
>    col("row.*"),
>    col("id")
> ).groupBy(
>     "start"
> ).agg(
>    min("id")
> ).show(){code}
> *Error message*
>  
>  
> {code:java}
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
> Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849]
> +- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) 
> null else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138])
>    +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null 
> else named_struct(latitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
> longitude, 
> knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
> start#2116 ASC NULLS FIRST], false, 0
>       +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103]
>          +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], 
> Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false
>             :- BroadcastQueryStage 0
>             :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, 
> bigint, false]),false), [id=#3768]
>             :     +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, 
> s2_cell_id#2108L]
>             :        +- *(1) Filter isnotnull(s2_cell_id#2108L)
>             :           +- FileScan parquet [row#2107,s2_cell_id#2108L] 
> Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, 
> Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], 
> PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: 
> struct>,s2_cell_id:bigint>
>             +- *(2) Filter (isnotnull(id#2103) AND 
> isnotnull(s2_cell_id#2104L))
>                +- *(2) ColumnarToRow
>                   +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: 
> true, DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format: 

[jira] [Created] (SPARK-37667) Spark throws TreeNodeException during variable expansion

2021-12-16 Thread Kellan B Cummings (Jira)
Kellan B Cummings created SPARK-37667:
-

 Summary: Spark throws TreeNodeException during variable expansion
 Key: SPARK-37667
 URL: https://issues.apache.org/jira/browse/SPARK-37667
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.2
Reporter: Kellan B Cummings


I'm seeing a TreeNodeException ("Couldn't find _gen_alias_") when running 
certain operations in Spark 3.1.2.

A few conditions need to be met to trigger the bug:
- a DF with a nested struct joins to a second DF
- a filter that compares a column in the right DF to a column in the left DF
- wildcard column expansion of the nested struct
- a group by statement on a struct column

*Data*
g...@github.com:kellanburket/spark3bug.git

 
{code:java}
val rightDf = spark.read.parquet("right.parquet")
val leftDf = spark.read.parquet("left.parquet"){code}
 

*Schemas*
{code:java}
leftDf.printSchema()
root
 |-- row: struct (nullable = true)
 |    |-- mid: string (nullable = true)
 |    |-- start: struct (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |-- s2_cell_id: long (nullable = true){code}
{code:java}
rightDf.printSchema()
root
 |-- id: string (nullable = true)
 |-- s2_cell_id: long (nullable = true){code}
 

*Breaking Code*
{code:java}
leftDf.join(rightDf, "s2_cell_id").filter(
    "id != row.start.latitude"
).select(
   col("row.*"), col("id")
).groupBy(
    "start"
).agg(
   min("id")
).show(){code}
 

*Working Examples*

The following examples don't seem to be effected by the bug

Works without group by:
{code:java}
leftDf.join(rightDf, "s2_cell_id").filter(
    "id != row.start.latitude"
).select(
   col("row.*"), col("id")
).show(){code}
Works without filter
{code:java}
leftDf.join(rightDf, "s2_cell_id").select(
   col("row.*"), col("id")
).groupBy(
    "start"
).agg(
   min("id")
).show(){code}
Works without variable expansion
{code:java}
leftDf.join(rightDf, "s2_cell_id").filter(
    "id != row.start.latitude"
).select(
   col("row.start"), col("id")
).groupBy(
    "start"
).agg(
   min("id")
).show(){code}
Works with caching
{code:java}
leftDf.join(rightDf, "s2_cell_id").filter(
    "id != row.start.latitude"
).cache().select(
   col("row.*"),
   col("id")
).groupBy(
    "start"
).agg(
   min("id")
).show(){code}
*Error message*

 

 
{code:java}
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(start#2116, 1024), ENSURE_REQUIREMENTS, [id=#3849]
+- SortAggregate(key=[knownfloatingpointnormalized(if (isnull(start#2116)) null 
else named_struct(latitude, 
knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
longitude, 
knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
start#2116], functions=[partial_min(id#2103)], output=[start#2116, min#2138])
   +- *(2) Sort [knownfloatingpointnormalized(if (isnull(start#2116)) null else 
named_struct(latitude, 
knownfloatingpointnormalized(normalizenanandzero(start#2116.latitude)), 
longitude, 
knownfloatingpointnormalized(normalizenanandzero(start#2116.longitude AS 
start#2116 ASC NULLS FIRST], false, 0
      +- *(2) Project [_gen_alias_2133#2133 AS start#2116, id#2103]
         +- *(2) !BroadcastHashJoin [s2_cell_id#2108L], [s2_cell_id#2104L], 
Inner, BuildLeft, NOT (cast(id#2103 as double) = _gen_alias_2134#2134), false
            :- BroadcastQueryStage 0
            :  +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, 
bigint, false]),false), [id=#3768]
            :     +- *(1) Project [row#2107.start AS _gen_alias_2133#2133, 
s2_cell_id#2108L]
            :        +- *(1) Filter isnotnull(s2_cell_id#2108L)
            :           +- FileScan parquet [row#2107,s2_cell_id#2108L] 
Batched: false, DataFilters: [isnotnull(s2_cell_id#2108L)], Format: Parquet, 
Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/left], 
PartitionFilters: [], PushedFilters: [IsNotNull(s2_cell_id)], ReadSchema: 
struct>,s2_cell_id:bigint>
            +- *(2) Filter (isnotnull(id#2103) AND isnotnull(s2_cell_id#2104L))
               +- *(2) ColumnarToRow
                  +- FileScan parquet [id#2103,s2_cell_id#2104L] Batched: true, 
DataFilters: [isnotnull(id#2103), isnotnull(s2_cell_id#2104L)], Format: 
Parquet, Location: InMemoryFileIndex[s3://co.mira.public/spark3_bug/right], 
PartitionFilters: [], PushedFilters: [IsNotNull(id), IsNotNull(s2_cell_id)], 
ReadSchema: struct
  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
  at 
org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$materializeFuture$1(ShuffleExchangeExec.scala:101)
  at org.apache.spark.sql.util.LazyValue.getOrInit(LazyValue.scala:41)
  at 
org.apache.spark.sql.execution.exchange.Exchange.getOrInitMaterializeFuture(Exchange.scala:71)
  at 
org.apache.spark.sql.execution.exc