[jira] [Comment Edited] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494885#comment-16494885
 ] 

Liang-Chi Hsieh edited comment on SPARK-24410 at 5/30/18 8:41 AM:
--

I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the values of 
{{key#25}} and {{key#28}} are located in the same partition.


was (Author: viirya):
I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the values of 
{{key#25}} and {{key#28}} are located in the same partition.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  

[jira] [Comment Edited] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494885#comment-16494885
 ] 

Liang-Chi Hsieh edited comment on SPARK-24410 at 5/30/18 8:41 AM:
--

I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the values of 
{{key#25}} and {{key#28}} are located in the same partition.


was (Author: viirya):
I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: iss

[jira] [Comment Edited] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494885#comment-16494885
 ] 

Liang-Chi Hsieh edited comment on SPARK-24410 at 5/30/18 2:20 PM:
--

I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the same values of 
{{key#25}} and {{key#28}} are located in the same partition.


was (Author: viirya):
I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

That said, for FileScan nodes at (1) and (2):
{code:java}
*(1) FileScan parquet default.a1[key#25L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a1], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct

*(2) FileScan parquet default.a2[key#28L] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/some/where/spark-warehouse/a2], 
PartitionFilters: [], PushedFilters: [], ReadSchema: struct{code}

FileScan (1)'s rows are partitioned by {{key#25}} and FileScan (2)'s rows are 
partitioned by {{key#28}}. But it doesn't guarantee that the values of 
{{key#25}} and {{key#28}} are located in the same partition.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
> 

[jira] [Comment Edited] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16496132#comment-16496132
 ] 

Liang-Chi Hsieh edited comment on SPARK-24410 at 5/31/18 5:39 AM:
--

We can verify the partition of union dataframe:
{code}
val df1 = spark.table("a1").select(spark_partition_id(), $"key")
val df2 = spark.table("a2").select(spark_partition_id(), $"key")
df1.union(df2).select(spark_partition_id(), $"key").collect
{code}
{code}
res8: Array[org.apache.spark.sql.Row] = Array([0,6], [0,7], [0,3], [1,0], 
[2,4], [2,8], [2,9], [2,2], [2,1], [2,5], [3,3], [3,7], [3,6], [4,0], [5,2], 
[5,9], [5,5], [5,8], [5,1], [5,4])
{code}

>From above result, we can find that for the same {{key}} from {{df1}} and 
>{{df2}} are at different partitions. E.g., key {{6}} are at partition {{0}} 
>and partition {{3}}. So we still need a shuffle to get the correct results.





was (Author: viirya):
We can verify the partition of union dataframe:
{code}
val df1 = spark.table("a1").select(spark_partition_id(), $"key")
val df2 = spark.table("a2").select(spark_partition_id(), $"key")
df1.union(df2).select(spark_partition_id(), $"key").collect
{code}
{code}
res8: Array[org.apache.spark.sql.Row] = Array([0,6], [0,7], [0,3], [1,0], 
[2,4], [2,8], [2,9], [2,2], [2,1], [2,5], [3,3], [3,7], [3,6], [4,0], [5,2], 
[5,9], [5,5], [5,8], [5,1], [5,4])
{code}

>From above result, we can find that for the same {{key}} from {{df1 and 
>{{df2}} are at different partitions. E.g., key {{6}} are at partition {{0}} 
>and partition {{3}}.




> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atla