[jira] [Commented] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

2018-01-03 Thread Modi Tamam (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16309407#comment-16309407
 ] 

Modi Tamam commented on SPARK-22898:


Sure, no problem. I did double check it on 2.2.1, and it looks just fine.

> collect_set aggregation on bucketed table causes an exchange stage
> --
>
> Key: SPARK-22898
> URL: https://issues.apache.org/jira/browse/SPARK-22898
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Modi Tamam
>  Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
> table, here's the desc formatted my_bucketed_tbl output:
> +++---+
> |col_nam| data_type|comment|
> +++---+
> |  bundle| string|   null|
> | ifa| string|   null|
> |   date_|date|   null|
> |hour| int|   null|
> |||   |
> |# Detailed Table ...||   |
> |Database| default|   |
> |   Table| my_bucketed_tbl|
> |   Owner|zeppelin|   |
> | Created|Thu Dec 21 13:43:...|   |
> | Last Access|Thu Jan 01 00:00:...|   |
> |Type|EXTERNAL|   |
> |Provider| orc|   |
> | Num Buckets|  16|   |
> |  Bucket Columns| [`ifa`]|   |
> |Sort Columns| [`ifa`]|   |
> |Table Properties|[transient_lastDd...|   |
> |Location|hdfs:/user/hive/w...|   |
> |   Serde Library|org.apache.hadoop...|   |
> | InputFormat|org.apache.hadoop...|   |
> |OutputFormat|org.apache.hadoop...|   |
> |  Storage Properties|[serialization.fo...|   |
> +++---+
> When I'm executing an explain of a group by query, I can see that we've 
> spared the exchange phase :
> {code:java}
> sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
> +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
>+- *Sort [ifa#932 ASC NULLS FIRST], false, 0
>   +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> {code}
> But, when I replace Spark's max function with collect_set, I can see that the 
> execution plan is the same as a non-bucketed table, means, the exchange phase 
> is not spared :
> {code:java}
> sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by 
> ifa").explain
> == Physical Plan ==
> ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 
> 0)])
> +- Exchange hashpartitioning(ifa#1010, 200)
>+- ObjectHashAggregate(keys=[ifa#1010], 
> functions=[partial_collect_set(bundle#998, 0, 0)])
>   +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

2018-01-02 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308970#comment-16308970
 ] 

Liang-Chi Hsieh commented on SPARK-22898:
-

If no problem I will resolve this as duplicate. You can re-open it if you have 
other questions.

> collect_set aggregation on bucketed table causes an exchange stage
> --
>
> Key: SPARK-22898
> URL: https://issues.apache.org/jira/browse/SPARK-22898
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Modi Tamam
>  Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
> table, here's the desc formatted my_bucketed_tbl output:
> +++---+
> |col_nam| data_type|comment|
> +++---+
> |  bundle| string|   null|
> | ifa| string|   null|
> |   date_|date|   null|
> |hour| int|   null|
> |||   |
> |# Detailed Table ...||   |
> |Database| default|   |
> |   Table| my_bucketed_tbl|
> |   Owner|zeppelin|   |
> | Created|Thu Dec 21 13:43:...|   |
> | Last Access|Thu Jan 01 00:00:...|   |
> |Type|EXTERNAL|   |
> |Provider| orc|   |
> | Num Buckets|  16|   |
> |  Bucket Columns| [`ifa`]|   |
> |Sort Columns| [`ifa`]|   |
> |Table Properties|[transient_lastDd...|   |
> |Location|hdfs:/user/hive/w...|   |
> |   Serde Library|org.apache.hadoop...|   |
> | InputFormat|org.apache.hadoop...|   |
> |OutputFormat|org.apache.hadoop...|   |
> |  Storage Properties|[serialization.fo...|   |
> +++---+
> When I'm executing an explain of a group by query, I can see that we've 
> spared the exchange phase :
> {code:java}
> sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
> +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
>+- *Sort [ifa#932 ASC NULLS FIRST], false, 0
>   +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> {code}
> But, when I replace Spark's max function with collect_set, I can see that the 
> execution plan is the same as a non-bucketed table, means, the exchange phase 
> is not spared :
> {code:java}
> sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by 
> ifa").explain
> == Physical Plan ==
> ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 
> 0)])
> +- Exchange hashpartitioning(ifa#1010, 200)
>+- ObjectHashAggregate(keys=[ifa#1010], 
> functions=[partial_collect_set(bundle#998, 0, 0)])
>   +- *FileScan orc default.level_1[bundle#998,ifa#1010] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22898) collect_set aggregation on bucketed table causes an exchange stage

2018-01-01 Thread Liang-Chi Hsieh (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307591#comment-16307591
 ] 

Liang-Chi Hsieh commented on SPARK-22898:
-

I think this should already be fixed by SPARK-3.

I do a test with current master branch:

{code}
val df = {
  (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
}
df.write
  .format("parquet")
  .bucketBy(8, "j")
  .sortBy("j")
  .saveAsTable("bucketed_table")
sql("select j, max(k) from bucketed_table group by j").explain
sql("select j, collect_set(k) from bucketed_table group by j").explain  
{code}

{code}
SortAggregate(key=[j#4851], functions=[max(k#4852)])
+- SortAggregate(key=[j#4851], functions=[partial_max(k#4852)])
   +- *Sort [j#4851 ASC NULLS FIRST], false, 0
  +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, 
Format: Parquet, Location: InMemory
FileIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], 
PartitionFilters: [], PushedFilters: [
], ReadSchema: struct
== Physical Plan ==
ObjectHashAggregate(keys=[j#4851], functions=[collect_set(k#4852, 0, 0)])
+- ObjectHashAggregate(keys=[j#4851], functions=[partial_collect_set(k#4852, 0, 
0)])
   +- *FileScan parquet default.bucketed_table[j#4851,k#4852] Batched: true, 
Format: Parquet, Location: InMemoryFil
eIndex[file:/root/repos/spark-1/sql/core/spark-warehouse/bucketed_table], 
PartitionFilters: [], PushedFilters: [], 
ReadSchema: struct
{code}



> collect_set aggregation on bucketed table causes an exchange stage
> --
>
> Key: SPARK-22898
> URL: https://issues.apache.org/jira/browse/SPARK-22898
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.3.0
>Reporter: Modi Tamam
>  Labels: bucketing
>
> I'm using Spark-2.2. I'm POCing Spark's bucketing. I've created a bucketed 
> table, here's the desc formatted my_bucketed_tbl output:
> +++---+
> |col_nam| data_type|comment|
> +++---+
> |  bundle| string|   null|
> | ifa| string|   null|
> |   date_|date|   null|
> |hour| int|   null|
> |||   |
> |# Detailed Table ...||   |
> |Database| default|   |
> |   Table| my_bucketed_tbl|
> |   Owner|zeppelin|   |
> | Created|Thu Dec 21 13:43:...|   |
> | Last Access|Thu Jan 01 00:00:...|   |
> |Type|EXTERNAL|   |
> |Provider| orc|   |
> | Num Buckets|  16|   |
> |  Bucket Columns| [`ifa`]|   |
> |Sort Columns| [`ifa`]|   |
> |Table Properties|[transient_lastDd...|   |
> |Location|hdfs:/user/hive/w...|   |
> |   Serde Library|org.apache.hadoop...|   |
> | InputFormat|org.apache.hadoop...|   |
> |OutputFormat|org.apache.hadoop...|   |
> |  Storage Properties|[serialization.fo...|   |
> +++---+
> When I'm executing an explain of a group by query, I can see that we've 
> spared the exchange phase :
> {code:java}
> sql("select ifa,max(bundle) from my_bucketed_tbl group by ifa").explain
> == Physical Plan ==
> SortAggregate(key=[ifa#932], functions=[max(bundle#920)])
> +- SortAggregate(key=[ifa#932], functions=[partial_max(bundle#920)])
>+- *Sort [ifa#932 ASC NULLS FIRST], false, 0
>   +- *FileScan orc default.level_1[bundle#920,ifa#932] Batched: false, 
> Format: ORC, Location: 
> InMemoryFileIndex[hdfs://ip-10-44-9-73.ec2.internal:8020/user/hive/warehouse/level_1/date_=2017-1...,
>  PartitionFilters: [], PushedFilters: [], ReadSchema: 
> struct
> {code}
> But, when I replace Spark's max function with collect_set, I can see that the 
> execution plan is the same as a non-bucketed table, means, the exchange phase 
> is not spared :
> {code:java}
> sql("select ifa,collect_set(bundle) from my_bucketed_tbl group by 
> ifa").explain
> == Physical Plan ==
> ObjectHashAggregate(keys=[ifa#1010], functions=[collect_set(bundle#998, 0, 
> 0)])
> +- Exchange hashpartitioning(ifa#1010, 200)
>+- ObjectHashAggregate(keys=[ifa#1010], 
> functions=[partial_collect_set(bundle#998, 0, 0)])
>   +- *FileScan orc