[ 
https://issues.apache.org/jira/browse/SPARK-32059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Frank Yin updated SPARK-32059:
------------------------------
    Description: 
Using tables and data structures in `SchemaPruningSuite.scala`

 
{code:java}
// code placeholder
case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
  id: Int,
  name: FullName,
  address: String,
  pets: Int,
  friends: Array[FullName] = Array.empty,
  relatives: Map[String, FullName] = Map.empty,
  employer: Employer = null,
  relations: Map[FullName, String] = Map.empty)
case class Department(
  depId: Int,
  depName: String,
  contactId: Int,
  employer: Employer)
{code}
 

The query to run:
{code:java}
// code placeholder
select a.name.first from (select row_number() over (partition by address order 
by id desc) as __rank, contacts.* from contacts) a where a.name.first = 'A' AND 
a.__rank = 1
{code}
 

The current physical plan:
{code:java}
// code placeholder
== Physical Plan ==
*(3) Project [name#46.first AS first#74]
+- *(3) Filter (((isnotnull(name#46) AND isnotnull(__rank#71)) AND 
(name#46.first = A)) AND (__rank#71 = 1))
   +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
__rank#71], [address#47], [id#45 DESC NULLS LAST]
      +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], false, 0
         +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
            +- *(1) Project [id#45, name#46, address#47]
               +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: 
false, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs400000gn/T/spark-85d173af-42...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<id:int,name:struct<first:string,middle:string,last:string>,address:string>
{code}
 

The desired physical plan:

 
{code:java}
// code placeholder
== Physical Plan ==
*(3) Project [_gen_alias_77#77 AS first#74]
+- *(3) Filter (((isnotnull(_gen_alias_77#77) AND isnotnull(__rank#71)) AND 
(_gen_alias_77#77 = A)) AND (__rank#71 = 1))
   +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
__rank#71], [address#47], [id#45 DESC NULLS LAST]
      +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], false, 0
         +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
            +- *(1) Project [id#45, name#46.first AS _gen_alias_77#77, 
address#47]
               +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: 
false, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs400000gn/T/spark-c64e0b29-d9...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<id:int,name:struct<first:string>,address:string>
{code}

  was:
Using tables and data structures in `SchemaPruningSuite.scala`

```

case class FullName(first: String, middle: String, last: String)
case class Company(name: String, address: String)
case class Employer(id: Int, company: Company)
case class Contact(
 id: Int,
 name: FullName,
 address: String,
 pets: Int,
 friends: Array[FullName] = Array.empty,
 relatives: Map[String, FullName] = Map.empty,
 employer: Employer = null,
 relations: Map[FullName, String] = Map.empty)
case class Department(
 depId: Int,
 depName: String,
 contactId: Int,
 employer: Employer)

```

 

The query to run: `

select a.name.first from (select row_number() over (partition by address order 
by id desc) as __rank, contacts.* from contacts) a where a.name.first = 'A' AND 
a.__rank = 1`

 

The current physical plan:

```

== Physical Plan ==
*(3) Project [name#46.first AS first#74]
+- *(3) Filter (((isnotnull(name#46) AND isnotnull(__rank#71)) AND 
(name#46.first = A)) AND (__rank#71 = 1))
 +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
__rank#71], [address#47], [id#45 DESC NULLS LAST]
 +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], false, 0
 +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
 +- *(1) Project [id#45, name#46, address#47]
 +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: false, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs400000gn/T/spark-85d173af-42...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<id:int,name:struct<first:string,middle:string,last:string>,address:string>

```

 

The desired physical plan:

```

== Physical Plan ==
*(3) Project [_gen_alias_77#77 AS first#74]
+- *(3) Filter (((isnotnull(_gen_alias_77#77) AND isnotnull(__rank#71)) AND 
(_gen_alias_77#77 = A)) AND (__rank#71 = 1))
 +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS 
__rank#71], [address#47], [id#45 DESC NULLS LAST]
 +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], false, 0
 +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
 +- *(1) Project [id#45, name#46.first AS _gen_alias_77#77, address#47]
 +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: false, 
DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs400000gn/T/spark-c64e0b29-d9...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<id:int,name:struct<first:string>,address:string>

```


> Nested Schema Pruning not Working in Window Functions
> -----------------------------------------------------
>
>                 Key: SPARK-32059
>                 URL: https://issues.apache.org/jira/browse/SPARK-32059
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Frank Yin
>            Priority: Major
>
> Using tables and data structures in `SchemaPruningSuite.scala`
>  
> {code:java}
> // code placeholder
> case class FullName(first: String, middle: String, last: String)
> case class Company(name: String, address: String)
> case class Employer(id: Int, company: Company)
> case class Contact(
>   id: Int,
>   name: FullName,
>   address: String,
>   pets: Int,
>   friends: Array[FullName] = Array.empty,
>   relatives: Map[String, FullName] = Map.empty,
>   employer: Employer = null,
>   relations: Map[FullName, String] = Map.empty)
> case class Department(
>   depId: Int,
>   depName: String,
>   contactId: Int,
>   employer: Employer)
> {code}
>  
> The query to run:
> {code:java}
> // code placeholder
> select a.name.first from (select row_number() over (partition by address 
> order by id desc) as __rank, contacts.* from contacts) a where a.name.first = 
> 'A' AND a.__rank = 1
> {code}
>  
> The current physical plan:
> {code:java}
> // code placeholder
> == Physical Plan ==
> *(3) Project [name#46.first AS first#74]
> +- *(3) Filter (((isnotnull(name#46) AND isnotnull(__rank#71)) AND 
> (name#46.first = A)) AND (__rank#71 = 1))
>    +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
> LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) 
> AS __rank#71], [address#47], [id#45 DESC NULLS LAST]
>       +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], 
> false, 0
>          +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
>             +- *(1) Project [id#45, name#46, address#47]
>                +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: 
> false, DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs400000gn/T/spark-85d173af-42...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<id:int,name:struct<first:string,middle:string,last:string>,address:string>
> {code}
>  
> The desired physical plan:
>  
> {code:java}
> // code placeholder
> == Physical Plan ==
> *(3) Project [_gen_alias_77#77 AS first#74]
> +- *(3) Filter (((isnotnull(_gen_alias_77#77) AND isnotnull(__rank#71)) AND 
> (_gen_alias_77#77 = A)) AND (__rank#71 = 1))
>    +- Window [row_number() windowspecdefinition(address#47, id#45 DESC NULLS 
> LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) 
> AS __rank#71], [address#47], [id#45 DESC NULLS LAST]
>       +- *(2) Sort [address#47 ASC NULLS FIRST, id#45 DESC NULLS LAST], 
> false, 0
>          +- Exchange hashpartitioning(address#47, 5), true, [id=#52]
>             +- *(1) Project [id#45, name#46.first AS _gen_alias_77#77, 
> address#47]
>                +- FileScan parquet [id#45,name#46,address#47,p#53] Batched: 
> false, DataFilters: [], Format: Parquet, Location: 
> InMemoryFileIndex[file:/private/var/folders/_c/4r2j33dd14n9ldfc2xqyzs400000gn/T/spark-c64e0b29-d9...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct<id:int,name:struct<first:string>,address:string>
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to