Hi All,

 

I am running a simple page rank program, but it is slow. And I dig out part
of reason is there is shuffle happen when I call an union action even both
RDD share the same partition:

 

Below is my test code in spark shell:

 

import org.apache.spark.HashPartitioner

 

sc.getConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")

val beta = 0.8

val numOfPartition = 6

  val links =
sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=
>{val part=line.split("\t");
(part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new
HashPartitioner(numOfPartition)).persist

  var ranks = links.mapValues(_ => 1.0)

  var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist

 

  for (i <- 1 until 2) {

    val contributions = links.join(ranks).flatMap {

      case (pageId, (links, rank)) =>

        links.map(dest => (dest, rank / links.size * beta))

    }

    ranks = contributions.union(leakedMatrix).reduceByKey(_ + _)

  }  

  ranks.lookup(1)

 

In above code, links will join ranks and should preserve the partition, and
leakedMatrix also share the same partition, so I expect there is no shuffle
happen on the contributions.union(leakedMatrix), also on the coming
reduceByKey after that. But finally there is shuffle write for all steps,
map, groupByKey, Union, partitionBy, etc.

 

I expect there should only happen once on the shuffle then all should local
operation, but the screen shows not, do I have any misunderstanding here?

 



Reply via email to