Re: Preserving partitioning with dataframe select

2016-02-09 Thread Michael Armbrust
RDD level partitioning information is not used to decide when to shuffle
for queries planned using Catalyst (since we have better information about
distribution from the query plan itself).  Instead you should be looking at
the logic in EnsureRequirements

.

We don't yet reason about equivalence classes for attributes when deciding
if a given partitioning is valid, but #10844
 is a start at building that
infrastructure.


Re: Preserving partitioning with dataframe select

2016-02-08 Thread Matt Cheah
Interesting ­ I might be misinterpreting my Spark UI then, in terms of the
number of stages I¹m seeing in the job before and after I¹m doing the
pre-partitioning.

That said, I was mostly thinking about this when reading through the code.
In particular, under basicOperators.scala in org.apache.spark.sql.execution,
the Project gets compiled down to child.executor.mapPartitionsInternal
without passing the preservesPartitioning flag. Is this Projection being
moved around in the case that the optimizer wants to take advantage of
co-partitioning? Guidance on how to trace the planner¹s logic would be
appreciated!

-Matt Cheah

From:  Reynold Xin 
Date:  Sunday, February 7, 2016 at 11:11 PM
To:  Matt Cheah 
Cc:  "dev@spark.apache.org" , Mingyu Kim

Subject:  Re: Preserving partitioning with dataframe select

Matt, 

Thanks for the email. Are you just asking whether it should work, or
reporting they don't work?

Internally, the way we track physical data distribution should make the
scenarios described work. If it doesn't, we should make them work.


On Sat, Feb 6, 2016 at 6:49 AM, Matt Cheah  wrote:
> Hi everyone, 
> 
> When using raw RDDs, it is possible to have a map() operation indicate that
> the partitioning for the RDD would be preserved by the map operation. This
> makes it easier to reduce the overhead of shuffles by ensuring that RDDs are
> co-partitioned when they are joined.
> 
> When I'm using Data Frames, I'm pre-partitioning the data frame by using
> DataFrame.partitionBy($"X"), but I will invoke a select statement after the
> partitioning before joining that dataframe with another. Roughly speaking, I'm
> doing something like this pseudo-code:
> 
> partitionedDataFrame = dataFrame.partitionBy("$X")
> groupedDataFrame = partitionedDataFrame.groupBy($"X").agg(aggregations)
> // Rename "X" to "Y" to make sure columns are unique
> groupedDataFrameRenamed = groupedDataFrame.withColumnRenamed("X", "Y")
> // Roughly speaking, join on "X == Y" to get the aggregation results onto
> every row
> joinedDataFrame = partitionedDataFrame.join(groupedDataFrame)
> 
> However the renaming of the columns maps to a select statement, and to my
> knowledge, selecting the columns is throwing off the partitioning which
> results in shuffle both the partitionedDataFrame and the groupedDataFrame.
> 
> I have the following questions given this example:
> 
> 1) Is pre-partitioning the Data Frame effective? In other words, does the
> physical planner recognize when underlying RDDs are co-partitioned and compute
> more efficient joins by reducing the amount of data that is shuffled?
> 2) If the planner takes advantage of co-partitioning, is the renaming of the
> columns invalidating the partitioning of the grouped Data Frame? When I look
> at the planner's conversion from logical.Project to the physical plan, I only
> see it invoking child.mapPartitions without specifying the
> preservesPartitioning flag.
> 
> Thanks,
> 
> -Matt Cheah





smime.p7s
Description: S/MIME cryptographic signature


Re: Preserving partitioning with dataframe select

2016-02-07 Thread Reynold Xin
Matt,

Thanks for the email. Are you just asking whether it should work, or
reporting they don't work?

Internally, the way we track physical data distribution should make the
scenarios described work. If it doesn't, we should make them work.


On Sat, Feb 6, 2016 at 6:49 AM, Matt Cheah  wrote:

> Hi everyone,
>
> When using raw RDDs, it is possible to have a map() operation indicate
> that the partitioning for the RDD would be preserved by the map operation.
> This makes it easier to reduce the overhead of shuffles by ensuring that
> RDDs are co-partitioned when they are joined.
>
> When I'm using Data Frames, I'm pre-partitioning the data frame by using
> DataFrame.partitionBy($"X"), but I will invoke a select statement after the
> partitioning before joining that dataframe with another. Roughly speaking,
> I'm doing something like this pseudo-code:
>
> partitionedDataFrame = dataFrame.partitionBy("$X")
> groupedDataFrame = partitionedDataFrame.groupBy($"X").agg(aggregations)
> // Rename "X" to "Y" to make sure columns are unique
> groupedDataFrameRenamed = groupedDataFrame.withColumnRenamed("X", "Y")
> // Roughly speaking, join on "X == Y" to get the aggregation results onto
> every row
> joinedDataFrame = partitionedDataFrame.join(groupedDataFrame)
>
> However the renaming of the columns maps to a select statement, and to my
> knowledge, selecting the columns is throwing off the partitioning which
> results in shuffle both the partitionedDataFrame and the groupedDataFrame.
>
> I have the following questions given this example:
>
> 1) Is pre-partitioning the Data Frame effective? In other words, does the
> physical planner recognize when underlying RDDs are co-partitioned and
> compute more efficient joins by reducing the amount of data that is
> shuffled?
> 2) If the planner takes advantage of co-partitioning, is the renaming of
> the columns invalidating the partitioning of the grouped Data Frame? When I
> look at the planner's conversion from logical.Project to the physical plan,
> I only see it invoking child.mapPartitions without specifying the
> preservesPartitioning flag.
>
> Thanks,
>
> -Matt Cheah
>


Preserving partitioning with dataframe select

2016-02-05 Thread Matt Cheah
Hi everyone,

When using raw RDDs, it is possible to have a map() operation indicate that the 
partitioning for the RDD would be preserved by the map operation. This makes it 
easier to reduce the overhead of shuffles by ensuring that RDDs are 
co-partitioned when they are joined.

When I'm using Data Frames, I'm pre-partitioning the data frame by using 
DataFrame.partitionBy($"X"), but I will invoke a select statement after the 
partitioning before joining that dataframe with another. Roughly speaking, I'm 
doing something like this pseudo-code:

partitionedDataFrame = dataFrame.partitionBy("$X")
groupedDataFrame = partitionedDataFrame.groupBy($"X").agg(aggregations)
// Rename "X" to "Y" to make sure columns are unique
groupedDataFrameRenamed = groupedDataFrame.withColumnRenamed("X", "Y")
// Roughly speaking, join on "X == Y" to get the aggregation results onto every 
row
joinedDataFrame = partitionedDataFrame.join(groupedDataFrame)

However the renaming of the columns maps to a select statement, and to my 
knowledge, selecting the columns is throwing off the partitioning which results 
in shuffle both the partitionedDataFrame and the groupedDataFrame.

I have the following questions given this example:

1) Is pre-partitioning the Data Frame effective? In other words, does the 
physical planner recognize when underlying RDDs are co-partitioned and compute 
more efficient joins by reducing the amount of data that is shuffled?
2) If the planner takes advantage of co-partitioning, is the renaming of the 
columns invalidating the partitioning of the grouped Data Frame? When I look at 
the planner's conversion from logical.Project to the physical plan, I only see 
it invoking child.mapPartitions without specifying the preservesPartitioning 
flag.

Thanks,

-Matt Cheah