I've no context of this book, AFAIK union will not trigger shuffle, as they 
just put the partitions together, the operator reduceByKey() will actually 
trigger shuffle.

Thanks
Jerry

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:26 PM
To: Shao, Saisai
Cc: user@spark.apache.org
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

In the book of learning spark:

[cid:image002.jpg@01D04F74.28C9F870]

So here it means only no shuffle happen crossing network but still will do 
shuffle locally? Even it is the case, why union will trigger shuffle? I think 
union will only just append the RDD together.

From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Monday, February 23, 2015 3:13 PM
To: Shuai Zheng
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: Union and reduceByKey will trigger shuffle even same partition?

If you call reduceByKey(), internally Spark will introduce a shuffle 
operations, not matter the data is already partitioned locally, Spark itself do 
not know the data is already well partitioned.

So if you want to avoid Shuffle, you have  to write the code explicitly to 
avoid this, from my understanding. You can call mapParitition to get a 
partition of data and reduce by key locally by your logic.

Thanks
Saisai

From: Shuai Zheng [mailto:szheng.c...@gmail.com]
Sent: Monday, February 23, 2015 12:00 PM
To: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Union and reduceByKey will trigger shuffle even same partition?

Hi All,

I am running a simple page rank program, but it is slow. And I dig out part of 
reason is there is shuffle happen when I call an union action even both RDD 
share the same partition:

Below is my test code in spark shell:

import org.apache.spark.HashPartitioner

sc.getConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val beta = 0.8
val numOfPartition = 6
  val links = 
sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=>{val
 part=line.split("\t"); 
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new 
HashPartitioner(numOfPartition)).persist
  var ranks = links.mapValues(_ => 1.0)
  var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist

  for (i <- 1 until 2) {
    val contributions = links.join(ranks).flatMap {
      case (pageId, (links, rank)) =>
        links.map(dest => (dest, rank / links.size * beta))
    }
    ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)
  }
  ranks.lookup(1)

In above code, links will join ranks and should preserve the partition, and 
leakedMatrix also share the same partition, so I expect there is no shuffle 
happen on the contributions.union(leakedMatrix), also on the coming reduceByKey 
after that. But finally there is shuffle write for all steps, map, groupByKey, 
Union, partitionBy, etc.

I expect there should only happen once on the shuffle then all should local 
operation, but the screen shows not, do I have any misunderstanding here?

[cid:image003.png@01D04F74.28C9F870]

Reply via email to