Yuming Wang created SPARK-37915:
-----------------------------------

             Summary: Push down deterministic projection through SQL UNION
                 Key: SPARK-37915
                 URL: https://issues.apache.org/jira/browse/SPARK-37915
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.3.0
            Reporter: Yuming Wang


{code:scala}
spark.range(11).selectExpr("cast(id as decimal(18, 1)) as a", "id as b", "id as 
c").write.saveAsTable("t1")
spark.range(12).selectExpr("cast(id as decimal(18, 2)) as a", "id as b", "id as 
c").write.saveAsTable("t2")
spark.range(13).selectExpr("cast(id as decimal(18, 3)) as a", "id as b", "id as 
c").write.saveAsTable("t3")
spark.range(14).selectExpr("cast(id as decimal(18, 4)) as a", "id as b", "id as 
c").write.saveAsTable("t4")
spark.range(15).selectExpr("cast(id as decimal(18, 5)) as a", "id as b", "id as 
c").write.saveAsTable("t5")

sql("select a from t1 union select a from t2 union select a from t3 union 
select a from t4 union select a from t5").explain(true)
{code}

Current:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#76], functions=[], output=[a#76])
   +- Exchange hashpartitioning(a#76, 5), ENSURE_REQUIREMENTS, [id=#159]
      +- HashAggregate(keys=[a#76], functions=[], output=[a#76])
         +- Union
            :- HashAggregate(keys=[a#74], functions=[], output=[a#76])
            :  +- Exchange hashpartitioning(a#74, 5), ENSURE_REQUIREMENTS, 
[id=#154]
            :     +- HashAggregate(keys=[a#74], functions=[], output=[a#74])
            :        +- Union
            :           :- HashAggregate(keys=[a#72], functions=[], 
output=[a#74])
            :           :  +- Exchange hashpartitioning(a#72, 5), 
ENSURE_REQUIREMENTS, [id=#149]
            :           :     +- HashAggregate(keys=[a#72], functions=[], 
output=[a#72])
            :           :        +- Union
            :           :           :- HashAggregate(keys=[a#70], functions=[], 
output=[a#72])
            :           :           :  +- Exchange hashpartitioning(a#70, 5), 
ENSURE_REQUIREMENTS, [id=#144]
            :           :           :     +- HashAggregate(keys=[a#70], 
functions=[], output=[a#70])
            :           :           :        +- Union
            :           :           :           :- Project [cast(a#55 as 
decimal(19,2)) AS a#70]
            :           :           :           :  +- FileScan parquet 
default.t1[a#55] Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,1)>
            :           :           :           +- Project [cast(a#58 as 
decimal(19,2)) AS a#71]
            :           :           :              +- FileScan parquet 
default.t2[a#58] Batched: true, DataFilters: [], Format: Parquet, Location: 
InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,2)>
            :           :           +- Project [cast(a#61 as decimal(20,3)) AS 
a#73]
            :           :              +- FileScan parquet default.t3[a#61] 
Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,3)>
            :           +- Project [cast(a#64 as decimal(21,4)) AS a#75]
            :              +- FileScan parquet default.t4[a#64] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,4)>
            +- Project [cast(a#67 as decimal(22,5)) AS a#77]
               +- FileScan parquet default.t5[a#67] Batched: true, DataFilters: 
[], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,5)>
{noformat}


Expected:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#76], functions=[], output=[a#76])
   +- Exchange hashpartitioning(a#76, 5), ENSURE_REQUIREMENTS, [id=#111]
      +- HashAggregate(keys=[a#76], functions=[], output=[a#76])
         +- Union
            :- Project [cast(cast(cast(cast(a#55 as decimal(19,2)) as 
decimal(20,3)) as decimal(21,4)) as decimal(22,5)) AS a#76]
            :  +- FileScan parquet default.t1[a#55] Batched: true, DataFilters: 
[], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,1)>
            :- Project [cast(cast(cast(cast(a#58 as decimal(19,2)) as 
decimal(20,3)) as decimal(21,4)) as decimal(22,5)) AS a#89]
            :  +- FileScan parquet default.t2[a#58] Batched: true, DataFilters: 
[], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,2)>
            :- Project [cast(cast(cast(a#61 as decimal(20,3)) as decimal(21,4)) 
as decimal(22,5)) AS a#87]
            :  +- FileScan parquet default.t3[a#61] Batched: true, DataFilters: 
[], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,3)>
            :- Project [cast(cast(a#64 as decimal(21,4)) as decimal(22,5)) AS 
a#84]
            :  +- FileScan parquet default.t4[a#64] Batched: true, DataFilters: 
[], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,4)>
            +- Project [cast(a#67 as decimal(22,5)) AS a#77]
               +- FileScan parquet default.t5[a#67] Batched: true, DataFilters: 
[], Format: Parquet, Location: InMemoryFileIndex(1 
paths)[file:/Users/yumwang/spark/SPARK-31890/external/avro/spark-warehouse/or...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:decimal(18,5)>
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to