Hi,

I believe that partitionBy will use the same (default) partitioner on both RDDs.

On 2015-02-12 17:12, Sean Owen wrote:
Doesn't this require that both RDDs have the same partitioner?

On Thu, Feb 12, 2015 at 3:48 PM, Imran Rashid <iras...@cloudera.com> wrote:
Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't require
any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized & cached somewhere? eg., if you just did this:

val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count() will trigger the evaluation of rddA & rddB which will require shuffles.
Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to reshuffle
them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether or not you are getting narrow dependencies, which would avoid the shuffle.
(Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> wrote:

Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), the
other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified that both RDDs have the same number of partitions and that equal keys reside on
the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that
assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about 50MB.

What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to