I think you're getting tripped up lazy evaluation and the way stage
boundaries work (admittedly its pretty confusing in this case).

It is true that up until recently, if you unioned two RDDs with the same
partitioner, the result did not have the same partitioner.  But that was
just fixed here:
https://github.com/apache/spark/pull/4629

That does mean that after you update ranks, it will no longer have a
partitioner, which will effect the join on your second iteration here:
 val contributions = links.join(ranks).flatMap

But, I think most of the shuffles you are pointing to are a different
issue.  I may be belaboring something you already know, but I think this is
easily confusing.  I think
 the first thing is understanding where you get stage boundaries, and how
they are named.  Each shuffle introduces a stage boundary.  However, the
stages get named by
the last thing in a stage, which is not really what is always causing the
shuffle.  Eg., reduceByKey() causes a shuffle, but we don't see that in a
stage name.  Similarly, map()
does not cause a shuffle, but we see a stage with that name.

So, what do the stage boundaries we see actually correspond to?

1) map -- that is doing the shuffle write for the following groupByKey
2) groupByKey -- in addition to reading the shuffle output from your map,
this is *also* doing the shuffle write for the next shuffle you introduce
w/ partitionBy
3) union -- this is doing the shuffle reading from your partitionBy, and
then all the work from there right up until the shuffle write for what is
immediatley after union -- your
 reduceByKey.
4) lookup is an action, which is why that has another stage.

a couple of things to note:
(a) your join does not cause a shuffle, b/c both rdds share a partitioner
(b) you have two shuffles from groupByKey followed by partitionBy -- you
really probably want the 1 arg form of groupByKey(partitioner)


hopefully this is helpful to understand how your stages & shuffles
correspond to your code.

Imran



On Mon, Feb 23, 2015 at 3:35 PM, Shuai Zheng <szheng.c...@gmail.com> wrote:

> This also trigger an interesting question:  how can I do this locally by
> code if I want. For example: I have RDD A and B, which has some partition,
> then if I want to join A to B, I might just want to do a mapper side join
> (although B itself might be big, but B’s local partition is known small
> enough put in memory), how can I access other RDD’s local partition in the 
> mapParitition
> method? Is it anyway to do this in Spark?
>
>
>
> *From:* Shao, Saisai [mailto:saisai.s...@intel.com]
> *Sent:* Monday, February 23, 2015 3:13 PM
> *To:* Shuai Zheng
> *Cc:* user@spark.apache.org
> *Subject:* RE: Union and reduceByKey will trigger shuffle even same
> partition?
>
>
>
> If you call reduceByKey(), internally Spark will introduce a shuffle
> operations, not matter the data is already partitioned locally, Spark
> itself do not know the data is already well partitioned.
>
>
>
> So if you want to avoid Shuffle, you have  to write the code explicitly to
> avoid this, from my understanding. You can call mapParitition to get a
> partition of data and reduce by key locally by your logic.
>
>
>
> Thanks
>
> Saisai
>
>
>
> *From:* Shuai Zheng [mailto:szheng.c...@gmail.com <szheng.c...@gmail.com>]
>
> *Sent:* Monday, February 23, 2015 12:00 PM
> *To:* user@spark.apache.org
> *Subject:* Union and reduceByKey will trigger shuffle even same partition?
>
>
>
> Hi All,
>
>
>
> I am running a simple page rank program, but it is slow. And I dig out
> part of reason is there is shuffle happen when I call an union action even
> both RDD share the same partition:
>
>
>
> Below is my test code in spark shell:
>
>
>
> import org.apache.spark.HashPartitioner
>
>
>
> sc.getConf.set("spark.serializer",
> "org.apache.spark.serializer.KryoSerializer")
>
> val beta = 0.8
>
> val numOfPartition = 6
>
>   val links =
> sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=>{val
> part=line.split("\t");
> (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
> HashPartitioner(numOfPartition)).persist
>
>   var ranks = links.mapValues(_ => 1.0)
>
>   var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist
>
>
>
>   for (i <- 1 until 2) {
>
>     val contributions = links.join(ranks).flatMap {
>
>       case (pageId, (links, rank)) =>
>
>         links.map(dest => (dest, rank / links.size * beta))
>
>     }
>
>     *ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)*
>
>   }
>
>   ranks.lookup(1)
>
>
>
> In above code, links will join ranks and should preserve the partition,
> and leakedMatrix also share the same partition, so I expect there is no
> shuffle happen on the contributions.union(leakedMatrix), also on the coming
> reduceByKey after that. But finally there is shuffle write for all steps,
> map, groupByKey, Union, partitionBy, etc.
>
>
>
> I expect there should only happen once on the shuffle then all should
> local operation, but the screen shows not, do I have any misunderstanding
> here?
>
>
>
>

Reply via email to