Hi Imran,

thanks for your quick reply.

Actually I am doing this:

    rddA = rddA.partitionBy(n).cache()
    rddB = rddB.partitionBy(n).cache()

followed by

    rddA.count()
    rddB.count()

then joinedRDD = rddA.join(rddB)

I thought that the count() would force the evaluation, so any subsequent joins would be shuffleless. I was wrong about the shuffle amounts however. The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil there is no Shuffle read. A joinedRdd.count() does a shuffle read of about 1GB in size, though.

The getPartitions-method does not exist on the resulting RDD (I am using the Python API). There is however foreachPartition(). What is the line

    joinedRdd.getPartitions.foreach{println}

supposed to do?

Thank you,

Karlson

PS: Sorry for sending this twice, I accidentally did not reply to the mailing list first.


On 2015-02-12 16:48, Imran Rashid wrote:
Hi Karlson,

I think your assumptions are correct -- that join alone shouldn't require
any shuffling.  But its possible you are getting tripped up by lazy
evaluation of RDDs. After you do your partitionBy, are you sure those RDDs are actually materialized & cached somewhere? eg., if you just did this:

val rddA = someData.partitionBy(N)
val rddB = someOtherData.partitionBy(N)
val joinedRdd = rddA.join(rddB)
joinedRdd.count() //or any other action

then the partitioning isn't actually getting run until you do the join. So though the join itself can happen without partitioning, joinedRdd.count()
will trigger the evaluation of rddA & rddB which will require shuffles.
Note that even if you have some intervening action on rddA & rddB that
shuffles them, unless you persist the result, you will need to reshuffle
them for the join.

If this doesn't help explain things, for debugging

joinedRdd.getPartitions.foreach{println}

this is getting into the weeds, but at least this will tell us whether or
not you are getting narrow dependencies, which would avoid the shuffle.
 (Does anyone know of a simpler way to check this?)

hope this helps,
Imran




On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> wrote:

Hi All,

using Pyspark, I create two RDDs (one with about 2M records (~200MB), the
other with about 8M records (~2GB)) of the format (key, value).

I've done a partitionBy(num_partitions) on both RDDs and verified that
both RDDs have the same number of partitions and that equal keys reside on
the same partition (via mapPartitionsWithIndex).

Now I'd expect that for a join on the two RDDs no shuffling is necessary. Looking at the Web UI under http://driver:4040 however reveals that that
assumption is false.

In fact I am seeing shuffle writes of about 200MB and reads of about 50MB.

What's the explanation for that behaviour? Where am I wrong with my
assumption?

Thanks in advance,

Karlson

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to