Hi All,
I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val beta = 0.8 val numOfPartition = 6 val links = sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line= >{val part=line.split("\t"); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ => 1.0) var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist for (i <- 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) => links.map(dest => (dest, rank / links.size * beta)) } ranks = contributions.union(leakedMatrix).reduceByKey(_ + _) } ranks.lookup(1) In above code, links will join ranks and should preserve the partition, and leakedMatrix also share the same partition, so I expect there is no shuffle happen on the contributions.union(leakedMatrix), also on the coming reduceByKey after that. But finally there is shuffle write for all steps, map, groupByKey, Union, partitionBy, etc. I expect there should only happen once on the shuffle then all should local operation, but the screen shows not, do I have any misunderstanding here?