Modi Tamam created SPARK-22898:
----------------------------------

             Summary: collect_set aggregation on bucketed table causes an 
exchange stage
                 Key: SPARK-22898
                 URL: https://issues.apache.org/jira/browse/SPARK-22898
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.3.0
            Reporter: Modi Tamam


I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
table, here's the desc formatted my_bucketed_tbl output:


    +--------------------+--------------------+-------+
    |            col_nam|         data_type|comment|
    +--------------------+--------------------+-------+
    |              bundle|                 string|   null|
    |                     ifa|                     string|   null|
    |               date_|                date|   null|
    |                hour|                 int|   null|
    |                    |                    |       |
    |# Detailed Table ...|                    |       |
    |            Database|             default|       |
    |               Table|             my_bucketed_tbl|
    |               Owner|            zeppelin|       |
    |             Created|Thu Dec 21 13:43:...|       |
    |         Last Access|Thu Jan 01 00:00:...|       |
    |                Type|            EXTERNAL|       |
    |            Provider|                 orc|       |
    |         Num Buckets|                  16|       |
    |      Bucket Columns|             [`ifa`]|       |
    |        Sort Columns|             [`ifa`]|       |
    |    Table Properties|[transient_lastDd...|       |
    |            Location|hdfs:/user/hive/w...|       |
    |       Serde Library|org.apache.hadoop...|       |
    |         InputFormat|org.apache.hadoop...|       |
    |        OutputFormat|org.apache.hadoop...|       |
    |  Storage Properties|[serialization.fo...|       |
    +--------------------+--------------------+-------+

When I'm executing an explain of a group by query, I can see that we've spared 
the exchange phase :


{code:java}
sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain

== Physical Plan ==
SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
+- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
   +- *Sort [ifa#932 ASC NULLS FIRST], false, 0
      +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, 
Format: ORC, Location: 
InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<bundle:string,ifa:string>
{code}

But, when I replace Spark's max function with collect_set, I can see that the 
execution plan is the same as a non-bucketed table, means, the exchange phase 
is not spared :


{code:java}
sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by ifa").explain

== Physical Plan ==
ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 0)])
+- Exchange hashpartitioning(ifa#1010, 200)
   +- ObjectHashAggregate(keys=[ifa#1010], 
functions=[partial_collect_set(bundle#998, 0, 0)])
      +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, 
Format: ORC, Location: 
InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
 PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct<bundle:string,ifa:string
{code}




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to