I think some RDD APIs like zipPartitions or others can do this as you wanted. I might check the docs.
Thanks Jerry From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 1:35 PM To: Shao, Saisai Cc: user@spark.apache.org Subject: RE: Union and reduceByKey will trigger shuffle even same partition? This also trigger an interesting question: how can I do this locally by code if I want. For example: I have RDD A and B, which has some partition, then if I want to join A to B, I might just want to do a mapper side join (although B itself might be big, but B's local partition is known small enough put in memory), how can I access other RDD's local partition in the mapParitition method? Is it anyway to do this in Spark? From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Monday, February 23, 2015 3:13 PM To: Shuai Zheng Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: RE: Union and reduceByKey will trigger shuffle even same partition? If you call reduceByKey(), internally Spark will introduce a shuffle operations, not matter the data is already partitioned locally, Spark itself do not know the data is already well partitioned. So if you want to avoid Shuffle, you have to write the code explicitly to avoid this, from my understanding. You can call mapParitition to get a partition of data and reduce by key locally by your logic. Thanks Saisai From: Shuai Zheng [mailto:szheng.c...@gmail.com] Sent: Monday, February 23, 2015 12:00 PM To: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Union and reduceByKey will trigger shuffle even same partition? Hi All, I am running a simple page rank program, but it is slow. And I dig out part of reason is there is shuffle happen when I call an union action even both RDD share the same partition: Below is my test code in spark shell: import org.apache.spark.HashPartitioner sc.getConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") val beta = 0.8 val numOfPartition = 6 val links = sc.textFile("c:/Download/web-Google.txt").filter(!_.contains("#")).map(line=>{val part=line.split("\t"); (part(0).toInt,part(1).toInt)}).groupByKey.partitionBy(new HashPartitioner(numOfPartition)).persist var ranks = links.mapValues(_ => 1.0) var leakedMatrix = links.mapValues(_ => (1.0-beta)).persist for (i <- 1 until 2) { val contributions = links.join(ranks).flatMap { case (pageId, (links, rank)) => links.map(dest => (dest, rank / links.size * beta)) } ranks = contributions.union(leakedMatrix).reduceByKey(_ + _) } ranks.lookup(1) In above code, links will join ranks and should preserve the partition, and leakedMatrix also share the same partition, so I expect there is no shuffle happen on the contributions.union(leakedMatrix), also on the coming reduceByKey after that. But finally there is shuffle write for all steps, map, groupByKey, Union, partitionBy, etc. I expect there should only happen once on the shuffle then all should local operation, but the screen shows not, do I have any misunderstanding here? [cid:image001.png@01D04F73.AFB2D330]