ah, sorry I am not too familiar w/ pyspark, sorry I missed that part.  It
could be that pyspark doesn't properly support narrow dependencies, or
maybe you need to be more explicit about the partitioner.  I am looking
into the pyspark api but you might have some better guesses here than I
thought.

My suggestion to do

joinedRdd.getPartitions.foreach{println}

was just to see if the partition was a NarrowCoGroupSplitDep or a
ShuffleCoGroupSplitDep -- but it was actually a bad suggestion, those
fields are hidden deeper inside and are not user-visible.  But I think a
better way (in scala, anyway) is to look at rdd.dependencies.  its a little
tricky, though, you need to look deep into the lineage (example at the end).

Sean -- yes it does require both RDDs have the same partitioner, but that
should happen naturally if you just specify the same number of partitions,
you'll get equal HashPartitioners.  There is a little difference in the
scala & python api that I missed here.  For partitionBy in scala, you
actually need to specify the partitioner, but not in python.  However I
thought it would work like groupByKey, which does just take an int.


Here's a code example in scala -- not sure what is available from python.
Hopefully somebody knows a simpler way to confirm narrow dependencies??

val d = sc.parallelize(1 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
> val d2 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(64)
> scala> d.partitioner == d2.partitioner
> res2: Boolean = true
> val joined = d.join(d2)
> val d3 = sc.parallelize(3 to 1e6.toInt).map{x => x -> x}.groupByKey(100)
> val badJoined = d.join(d3)
>
> d.setName("d")
> d2.setName("d2")
> d3.setName("d3")
> joined.setName("joined")
> badJoined.setName("badJoined")


> //unfortunatley, just looking at the immediate dependencies of joined &
> badJoined is misleading, b/c join actually creates
> // one more step after the shuffle
> scala> joined.dependencies
> res20: Seq[org.apache.spark.Dependency[_]] =
> List(org.apache.spark.OneToOneDependency@74751ac8)
> //even with the join that does require a shuffle, we still see a
> OneToOneDependency, but thats just a simple flatMap step
> scala> badJoined.dependencies
> res21: Seq[org.apache.spark.Dependency[_]] =
> List(org.apache.spark.OneToOneDependency@1cf356cc)
>



>  //so lets make a helper function to get all the dependencies recursively
>
def flattenDeps(rdd: RDD[_]): Seq[(RDD[_], Dependency[_])] = {
>   val deps = rdd.dependencies
>   deps.map{rdd -> _} ++ deps.flatMap{dep => flattenDeps(dep.rdd)}
> }
>
>
> //full dependencies of the good join

scala> flattenDeps(joined).foreach{println}
> (joined FlatMappedValuesRDD[9] at join at
> <console>:16,org.apache.spark.OneToOneDependency@74751ac8)
> (MappedValuesRDD[8] at join at
> <console>:16,org.apache.spark.OneToOneDependency@623264af)
>
> *(CoGroupedRDD[7] at join at
> <console>:16,org.apache.spark.OneToOneDependency@5a704f86)*(CoGroupedRDD[7]
> at join at <console>:16,org.apache.spark.OneToOneDependency@37514cd)
> (d ShuffledRDD[3] at groupByKey at
> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
> (MappedRDD[2] at map at
> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
> (d2 ShuffledRDD[6] at groupByKey at
> <console>:12,org.apache.spark.ShuffleDependency@5960236d)
> (MappedRDD[5] at map at
> <console>:12,org.apache.spark.OneToOneDependency@36b5f6f2)
>
>

> //full dependencies of the bad join -- notice the ShuffleDependency!

scala> flattenDeps(badJoined).foreach{println}
> (badJoined FlatMappedValuesRDD[15] at join at
> <console>:16,org.apache.spark.OneToOneDependency@1cf356cc)
> (MappedValuesRDD[14] at join at
> <console>:16,org.apache.spark.OneToOneDependency@5dea4db)
>
> *(CoGroupedRDD[13] at join at
> <console>:16,org.apache.spark.ShuffleDependency@5c1928df)*(CoGroupedRDD[13]
> at join at <console>:16,org.apache.spark.OneToOneDependency@77ca77b5)
> (d ShuffledRDD[3] at groupByKey at
> <console>:12,org.apache.spark.ShuffleDependency@7ba8a080)
> (MappedRDD[2] at map at
> <console>:12,org.apache.spark.OneToOneDependency@7bc172ec)
> (d3 ShuffledRDD[12] at groupByKey at
> <console>:12,org.apache.spark.ShuffleDependency@d794984)
> (MappedRDD[11] at map at
> <console>:12,org.apache.spark.OneToOneDependency@15c98005)



On Thu, Feb 12, 2015 at 10:05 AM, Karlson <ksonsp...@siberie.de> wrote:

> Hi Imran,
>
> thanks for your quick reply.
>
> Actually I am doing this:
>
>     rddA = rddA.partitionBy(n).cache()
>     rddB = rddB.partitionBy(n).cache()
>
> followed by
>
>     rddA.count()
>     rddB.count()
>
> then joinedRDD = rddA.join(rddB)
>
> I thought that the count() would force the evaluation, so any subsequent
> joins would be shuffleless. I was wrong about the shuffle amounts however.
> The shuffle write is actually 2GB (i.e. the size of the bigger RDD) whil
> there is no Shuffle read. A joinedRdd.count() does a shuffle read of about
> 1GB in size, though.
>
> The getPartitions-method does not exist on the resulting RDD (I am using
> the Python API). There is however foreachPartition(). What is the line
>
>     joinedRdd.getPartitions.foreach{println}
>
> supposed to do?
>
> Thank you,
>
> Karlson
>
> PS: Sorry for sending this twice, I accidentally did not reply to the
> mailing list first.
>
>
>
> On 2015-02-12 16:48, Imran Rashid wrote:
>
>> Hi Karlson,
>>
>> I think your assumptions are correct -- that join alone shouldn't require
>> any shuffling.  But its possible you are getting tripped up by lazy
>> evaluation of RDDs.  After you do your partitionBy, are you sure those
>> RDDs
>> are actually materialized & cached somewhere?  eg., if you just did this:
>>
>> val rddA = someData.partitionBy(N)
>> val rddB = someOtherData.partitionBy(N)
>> val joinedRdd = rddA.join(rddB)
>> joinedRdd.count() //or any other action
>>
>> then the partitioning isn't actually getting run until you do the join.
>> So
>> though the join itself can happen without partitioning, joinedRdd.count()
>> will trigger the evaluation of rddA & rddB which will require shuffles.
>> Note that even if you have some intervening action on rddA & rddB that
>> shuffles them, unless you persist the result, you will need to reshuffle
>> them for the join.
>>
>> If this doesn't help explain things, for debugging
>>
>> joinedRdd.getPartitions.foreach{println}
>>
>> this is getting into the weeds, but at least this will tell us whether or
>> not you are getting narrow dependencies, which would avoid the shuffle.
>>  (Does anyone know of a simpler way to check this?)
>>
>> hope this helps,
>> Imran
>>
>>
>>
>>
>> On Thu, Feb 12, 2015 at 9:25 AM, Karlson <ksonsp...@siberie.de> wrote:
>>
>>  Hi All,
>>>
>>> using Pyspark, I create two RDDs (one with about 2M records (~200MB), the
>>> other with about 8M records (~2GB)) of the format (key, value).
>>>
>>> I've done a partitionBy(num_partitions) on both RDDs and verified that
>>> both RDDs have the same number of partitions and that equal keys reside
>>> on
>>> the same partition (via mapPartitionsWithIndex).
>>>
>>> Now I'd expect that for a join on the two RDDs no shuffling is necessary.
>>> Looking at the Web UI under http://driver:4040 however reveals that that
>>> assumption is false.
>>>
>>> In fact I am seeing shuffle writes of about 200MB and reads of about
>>> 50MB.
>>>
>>> What's the explanation for that behaviour? Where am I wrong with my
>>> assumption?
>>>
>>> Thanks in advance,
>>>
>>> Karlson
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to