Hello Michael,

In Spark SQL, we have our internal concepts of Output Partitioning
(representing the partitioning scheme of an operator's output) and Required
Child Distribution (representing the requirement of input data distribution
of an operator) for a physical operator. Let's say we have two operators,
parent and child, and the parent takes the output of the child as its
input. At the end of query planning process, whenever the Output
Partitioning of the child does not satisfy the Required Child Distribution
of the parent, we will add an Exchange operator between the parent and
child to shuffle the data. Right now, we do not record the partitioning
scheme of an input table. So, I think even if you use partitionBy (or
DISTRIBUTE BY in SQL) to prepare your data, you still will see the Exchange
operator and your GROUP BY operation will be executed in a new stage (after
the Exchange).

Making Spark SQL aware of the partitioning scheme of input tables is a
useful optimization. I just created
https://issues.apache.org/jira/browse/SPARK-5354 to track it.

Thanks,

Yin



On Wed, Jan 21, 2015 at 8:43 AM, Michael Davies <
michael.belldav...@gmail.com> wrote:

> Hi Cheng,
>
> Are you saying that by setting up the lineage schemaRdd.keyBy(_.getString(
> 1)).partitionBy(new HashPartitioner(n)).values.applySchema(schema)
> then Spark SQL will know that an SQL “group by” on Customer Code will not
> have to shuffle?
>
> But the prepared will have already shuffled so we pay an upfront cost for
> future groupings (assuming we cache I suppose)
>
> Mick
>
> On 20 Jan 2015, at 20:44, Cheng Lian <lian.cs....@gmail.com> wrote:
>
>  First of all, even if the underlying dataset is partitioned as expected,
> a shuffle can’t be avoided. Because Spark SQL knows nothing about the
> underlying data distribution. However, this does reduce network IO.
>
> You can prepare your data like this (say CustomerCode is a string field
> with ordinal 1):
>
> val schemaRdd = sql(...)val schema = schemaRdd.schemaval prepared = 
> schemaRdd.keyBy(_.getString(1)).partitionBy(new 
> HashPartitioner(n)).values.applySchema(schema)
>
> n should be equal to spark.sql.shuffle.partitions.
>
> Cheng
>
> On 1/19/15 7:44 AM, Mick Davies wrote:
>
>
>  Is it possible to use a HashPartioner or something similar to distribute a
> SchemaRDDs data by the hash of a particular column or set of columns.
>
> Having done this I would then hope that GROUP BY could avoid shuffle
>
> E.g. set up a HashPartioner on CustomerCode field so that
>
> SELECT CustomerCode, SUM(Cost)
> FROM Orders
> GROUP BY CustomerCode
>
> would not need to shuffle.
>
> Cheers
> Mick
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Using-HashPartitioner-to-distribute-by-column-tp21237.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
> ​
>
>
>

Reply via email to