[ 
https://issues.apache.org/jira/browse/SPARK-37201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Kotlov updated SPARK-37201:
----------------------------------
    Description: 
In this example, reading unnecessary nested fields still happens.

Data preparation:
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct<struct:struct<v1:string,v2:string,v3:string>,array:array<string>>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct<struct:struct<v1:string>,array:array<string>>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct<struct:struct<v1:string>,array:array<string>>
{code}
 

*Yet another example: left_anti join after double select:*
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, field1: String, field2: String)
Seq(
      Event(Struct("v1","v2","v3"), "fld1", "fld2")
    ).toDF().write.mode("overwrite").saveAsTable("table")    
val joinDf = Seq("id1").toDF("id")

spark.table("table")
      .select("struct", "field1")
      .select($"struct.v1", $"field1")
      .join(joinDf, $"field1" === joinDf("id"), "left_anti")
      .explain(true)

// ===> ReadSchema: 
struct<struct:struct<v1:string,v2:string,v3:string>,field1:string>
{code}
If you just remove the first select or change type of join, reading nested 
fields will be correct:**
{code:java}
// .select("struct", "field1")
===> ReadSchema: struct<struct:struct<v1:string>,field1:string>

.join(joinDf, $"field1" === joinDf("id"), "left")
===> ReadSchema: struct<struct:struct<v1:string>,field1:string>
{code}
PS: The first select might look strange in the context of this example, but in 
a real system, it might be part of a common api, that other parts of the system 
use with their own expressions on top of this api.

  was:
In this example, reading unnecessary nested fields still happens.

Data preparation:

 
{code:java}
case class Struct(v1: String, v2: String, v3: String)
case class Event(struct: Struct, array: Seq[String])

Seq(
  Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
).toDF().write.mode("overwrite").saveAsTable("table")
{code}
 

v2 and v3 columns aren't needed here, but still exist in the physical plan.
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
 
== Physical Plan ==
... ReadSchema: 
struct<struct:struct<v1:string,v2:string,v3:string>,array:array<string>>

{code}
If you just remove _filter_ or move _explode_ to second _select_, everything is 
fine:
{code:java}
spark.table("table")
  .select($"struct.v1", explode($"array").as("el"))
  //.filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct<struct:struct<v1:string>,array:array<string>>

spark.table("table")
  .select($"struct.v1", $"array")
  .select($"v1", explode($"array").as("el"))
  .filter($"el" === "cx1")
  .explain(true)
  
// ... ReadSchema: struct<struct:struct<v1:string>,array:array<string>>
{code}


> Spark SQL reads unnecessary nested fields (filter after explode)
> ----------------------------------------------------------------
>
>                 Key: SPARK-37201
>                 URL: https://issues.apache.org/jira/browse/SPARK-37201
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: Sergey Kotlov
>            Priority: Major
>
> In this example, reading unnecessary nested fields still happens.
> Data preparation:
> {code:java}
> case class Struct(v1: String, v2: String, v3: String)
> case class Event(struct: Struct, array: Seq[String])
> Seq(
>   Event(Struct("v1","v2","v3"), Seq("cx1", "cx2"))
> ).toDF().write.mode("overwrite").saveAsTable("table")
> {code}
>  v2 and v3 columns aren't needed here, but still exist in the physical plan.
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>  
> == Physical Plan ==
> ... ReadSchema: 
> struct<struct:struct<v1:string,v2:string,v3:string>,array:array<string>>
> {code}
> If you just remove _filter_ or move _explode_ to second _select_, everything 
> is fine:
> {code:java}
> spark.table("table")
>   .select($"struct.v1", explode($"array").as("el"))
>   //.filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct<struct:struct<v1:string>,array:array<string>>
> spark.table("table")
>   .select($"struct.v1", $"array")
>   .select($"v1", explode($"array").as("el"))
>   .filter($"el" === "cx1")
>   .explain(true)
>   
> // ... ReadSchema: struct<struct:struct<v1:string>,array:array<string>>
> {code}
>  
> *Yet another example: left_anti join after double select:*
> {code:java}
> case class Struct(v1: String, v2: String, v3: String)
> case class Event(struct: Struct, field1: String, field2: String)
> Seq(
>       Event(Struct("v1","v2","v3"), "fld1", "fld2")
>     ).toDF().write.mode("overwrite").saveAsTable("table")    
> val joinDf = Seq("id1").toDF("id")
> spark.table("table")
>       .select("struct", "field1")
>       .select($"struct.v1", $"field1")
>       .join(joinDf, $"field1" === joinDf("id"), "left_anti")
>       .explain(true)
> // ===> ReadSchema: 
> struct<struct:struct<v1:string,v2:string,v3:string>,field1:string>
> {code}
> If you just remove the first select or change type of join, reading nested 
> fields will be correct:**
> {code:java}
> // .select("struct", "field1")
> ===> ReadSchema: struct<struct:struct<v1:string>,field1:string>
> .join(joinDf, $"field1" === joinDf("id"), "left")
> ===> ReadSchema: struct<struct:struct<v1:string>,field1:string>
> {code}
> PS: The first select might look strange in the context of this example, but 
> in a real system, it might be part of a common api, that other parts of the 
> system use with their own expressions on top of this api.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to