[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-09-14 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614683#comment-16614683
 ] 

Eyal Farago commented on SPARK-24410:
-

[~viirya], I see that the PR is now closed (postponed) due to concerns of 
performance regression when partitioning is not required by parent operators.

I think this can be solved by introducing a rule that identifies an Exchange 
over Union that already supports the required partitioning, in this case the 
union operator can be replaced by a partitioning aware union operator that 
merges the partitions (basically implements your approach as presented in the 
PR).

I think this approach limits the regression mentioned in the pull request and 
improves performance when upstream requires a matching partitioning, do you 
think this approach also has to be governed by a cost based approach?

 

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-09-14 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16614678#comment-16614678
 ] 

Eyal Farago commented on SPARK-24410:
-

[~viirya], I've opened SPARK-25203 because of your answer on August 13th.
however looking at your PR it seems you opted to solve the general case, and 
bucketed table is just a private case of the general case.
do you think SPARK-25203 should be closed as a duplicate of this? if so can you 
please add the example used there to this issue as well?

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-08-13 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578789#comment-16578789
 ] 

Eyal Farago commented on SPARK-24410:
-

[~viirya], my bad :)

seems there are two distinct issues here: one is general behavior of 
join/aggregate over unions, the other is the guarantees of bucketed 
partitioning.

looking more carefully at the results of your query it seems that the two DFs 
are not co-partitioned (which is a bit surprising), so my apologies.

having that said, there's a more general issue with pushing down shuffle 
related operations over a union, do you guys think this deserves a separate 
issue?

 

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-08-13 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578763#comment-16578763
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

The above code shows that the two tables in union results are located in 
logically different partitions, even you know they might be physically 
co-partitioned. So we can't just get rid of the shuffle and expect the correct 
results, because of `SparkContext.union`'s current implementation.

That is why cloud-fan suggested to implement Union with RDD.zip for some 
certain case, to preserve the children output partitioning.

Although we can make Union smarter on its output partitioning, from the 
discussion you can see we might need to also consider parallelism and locality.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-08-13 Thread Eyal Farago (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578489#comment-16578489
 ] 

Eyal Farago commented on SPARK-24410:
-

[~viirya], I think your conclusion about co-partitioning is wrong, the 
following code segment from your comment:
{code:java}
val df1 = spark.table("a1").select(spark_partition_id(), $"key")
val df2 = spark.table("a2").select(spark_partition_id(), $"key")
df1.union(df2).select(spark_partition_id(), $"key").collect
{code}
this prints the partition ids as assigned by union, assuming union simply 
concatenates the partitions from df1 and df2 assigning them a running number 
id, it really makes sense you'd get two partitions per key: one coming from df1 
and the other from df2.

applying this select on each dataframe separately you'd get the exact same 
results meaning a given key will have the same partition id in both dataframes.

I think this code fragment basically shows what's wrong with current 
implementation of Union, no that we can't optimize unions of co-partitioned 
relations.

if union was a bit more 'partitioning aware' it'd be able to identify that both 
children have the same partitioning scheme and 'inherit' it. as you actually 
showed this might be a bit tricky as the same logical attribute from different 
children has a different expression id, but Union eventually maps these child 
attributes into a single output attribute, so this information can be used to 
resolve the partitioning columns and determine their equality.

furthermore, Union being smarter on its output partitioning won't cut it, few 
rules have to be added/modified:

1. applying exchange on a union should sometimes be pushed to the children 
(children can be partitioned to those supporting the required partitioning and 
others not supporting it, the exchange can be applied to a union of the 
non-supporting children and then unioned with the rest of the children)
 2. partial aggregate also has to be pushed to the children resulting with a 
union of partial aggregations, again it's possible to partition children 
according to their support of the required partitioning.
 3. final aggregation over a union introduces an exchange which will then be 
pushed to the children, the aggregation is then applied on top of the 
partitioning aware union (think of the way PartitionerAwareUnionRDD handles 
partitioning).
 * partition children = partitioning an array by a predicate 
(scala.collection.TraversableLike#partition)
 * other operators like join may require additional rules.
 * some of this ideas were discussed offline with [~hvanhovell]

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: 

[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-06-05 Thread Apache Spark (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16501560#comment-16501560
 ] 

Apache Spark commented on SPARK-24410:
--

User 'viirya' has created a pull request for this issue:
https://github.com/apache/spark/pull/21498

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-31 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496178#comment-16496178
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

Yeah, it depends on how we combine the RDDs from Union's children. Currently 
{{SparkContext.union}} doesn't produce the results that this issue wants to 
have. So I will leave {{UnionExec#outputPartitioning}} untouched for now.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-31 Thread Wenchen Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496147#comment-16496147
 ] 

Wenchen Fan commented on SPARK-24410:
-

[~viirya] thanks for your investigation!

I think one possible solution is to implement Union with RDD.zip for some 
certain case, to preserve the children output partitioning, but we need to 
think about when we should do this and whether it has performance regression.


> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16496132#comment-16496132
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

We can verify the partition of union dataframe:
{code}
val df1 = spark.table("a1").select(spark_partition_id(), $"key")
val df2 = spark.table("a2").select(spark_partition_id(), $"key")
df1.union(df2).select(spark_partition_id(), $"key").collect
{code}
{code}
res8: Array[org.apache.spark.sql.Row] = Array([0,6], [0,7], [0,3], [1,0], 
[2,4], [2,8], [2,9], [2,2], [2,1], [2,5], [3,3], [3,7], [3,6], [4,0], [5,2], 
[5,9], [5,5], [5,8], [5,1], [5,4])
{code}

>From above result, we can find that for the same {{key}} from {{df1 and 
>{{df2}} are at different partitions. E.g., key {{6}} are at partition {{0}} 
>and partition {{3}}.




> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-30 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494885#comment-16494885
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

I've done some experiments locally. But the results show that in above case 
seems we don't guarantee that the data distribution is the same when reading 
two tables {{a1}} and {{a2}}. By removing the shuffling, you actually don't get 
the correct results.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-29 Thread Liang-Chi Hsieh (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16494391#comment-16494391
 ] 

Liang-Chi Hsieh commented on SPARK-24410:
-

[~cloud_fan] Thanks for pinging me. I'll look into this.

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-29 Thread Wenchen Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493637#comment-16493637
 ] 

Wenchen Fan commented on SPARK-24410:
-

The `UnionExec#outputPartitioning` should be smarter and propagate the 
children's output partitioning if possible. cc [~viirya]

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24410) Missing optimization for Union on bucketed tables

2018-05-29 Thread Ohad Raviv (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16493561#comment-16493561
 ] 

Ohad Raviv commented on SPARK-24410:


[~sowen], [~cloud_fan] - could you please check if my assessment is correct? 
thanks!

> Missing optimization for Union on bucketed tables
> -
>
> Key: SPARK-24410
> URL: https://issues.apache.org/jira/browse/SPARK-24410
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ohad Raviv
>Priority: Major
>
> A common use-case we have is of a partially aggregated table and daily 
> increments that we need to further aggregate. we do this my unioning the two 
> tables and aggregating again.
> we tried to optimize this process by bucketing the tables, but currently it 
> seems that the union operator doesn't leverage the tables being bucketed 
> (like the join operator).
> for example, for two bucketed tables a1,a2:
> {code}
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
> .repartition(col("key"))
> .write
>   .mode(SaveMode.Overwrite)
> .bucketBy(3, "key")
> .sortBy("t1")
> .saveAsTable("a1")
> sparkSession.range(N).selectExpr(
>   "id as key",
>   "id % 2 as t1",
>   "id % 3 as t2")
>   .repartition(col("key"))
>   .write.mode(SaveMode.Overwrite)
>   .bucketBy(3, "key")
>   .sortBy("t1")
>   .saveAsTable("a2")
> {code}
> for the join query we get the "SortMergeJoin"
> {code}
> select * from a1 join a2 on (a1.key=a2.key)
> == Physical Plan ==
> *(3) SortMergeJoin [key#24L], [key#27L], Inner
> :- *(1) Sort [key#24L ASC NULLS FIRST], false, 0
> :  +- *(1) Project [key#24L, t1#25L, t2#26L]
> : +- *(1) Filter isnotnull(key#24L)
> :+- *(1) FileScan parquet default.a1[key#24L,t1#25L,t2#26L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> +- *(2) Sort [key#27L ASC NULLS FIRST], false, 0
>+- *(2) Project [key#27L, t1#28L, t2#29L]
>   +- *(2) Filter isnotnull(key#27L)
>  +- *(2) FileScan parquet default.a2[key#27L,t1#28L,t2#29L] Batched: 
> true, Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [IsNotNull(key)], ReadSchema: 
> struct
> {code}
> but for aggregation after union we get a shuffle:
> {code}
> select key,count(*) from (select * from a1 union all select * from a2)z group 
> by key
> == Physical Plan ==
> *(4) HashAggregate(keys=[key#25L], functions=[count(1)], output=[key#25L, 
> count(1)#36L])
> +- Exchange hashpartitioning(key#25L, 1)
>+- *(3) HashAggregate(keys=[key#25L], functions=[partial_count(1)], 
> output=[key#25L, count#38L])
>   +- Union
>  :- *(1) Project [key#25L]
>  :  +- *(1) FileScan parquet default.a1[key#25L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a1], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
>  +- *(2) Project [key#28L]
> +- *(2) FileScan parquet default.a2[key#28L] Batched: true, 
> Format: Parquet, Location: 
> InMemoryFileIndex[file:/some/where/spark-warehouse/a2], PartitionFilters: [], 
> PushedFilters: [], ReadSchema: struct
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org