Hi everyone,

When using raw RDDs, it is possible to have a map() operation indicate that the 
partitioning for the RDD would be preserved by the map operation. This makes it 
easier to reduce the overhead of shuffles by ensuring that RDDs are 
co-partitioned when they are joined.

When I'm using Data Frames, I'm pre-partitioning the data frame by using 
DataFrame.partitionBy($"X"), but I will invoke a select statement after the 
partitioning before joining that dataframe with another. Roughly speaking, I'm 
doing something like this pseudo-code:

partitionedDataFrame = dataFrame.partitionBy("$X")
groupedDataFrame = partitionedDataFrame.groupBy($"X").agg(aggregations)
// Rename "X" to "Y" to make sure columns are unique
groupedDataFrameRenamed = groupedDataFrame.withColumnRenamed("X", "Y")
// Roughly speaking, join on "X == Y" to get the aggregation results onto every 
row
joinedDataFrame = partitionedDataFrame.join(groupedDataFrame)

However the renaming of the columns maps to a select statement, and to my 
knowledge, selecting the columns is throwing off the partitioning which results 
in shuffle both the partitionedDataFrame and the groupedDataFrame.

I have the following questions given this example:

1) Is pre-partitioning the Data Frame effective? In other words, does the 
physical planner recognize when underlying RDDs are co-partitioned and compute 
more efficient joins by reducing the amount of data that is shuffled?
2) If the planner takes advantage of co-partitioning, is the renaming of the 
columns invalidating the partitioning of the grouped Data Frame? When I look at 
the planner's conversion from logical.Project to the physical plan, I only see 
it invoking child.mapPartitions without specifying the preservesPartitioning 
flag.

Thanks,

-Matt Cheah

Reply via email to