Hi all, I tested *partitionBy *feature in wordcount application, and I'm puzzled by a phenomenon. In this application, I created an rdd from some text files in HDFS(about 100GB in size), each of which has lines composed of words separated by a character "#". I wanted to count the occurence for each distinct word. *All lines have the same contents so finally the result should be very small in bytes*. The code is as follows:
val text = sc.textFile(inputDir) val tuples = text.flatMap(line => line.split("#")) .map((_, 1)) .reduceByKey(_ + _) tuples.collect.foreach{ case (word, count) => println(word + " -> " + count)} I submitted the application to a Spark cluster of 5 nodes and run it in standalone mode. From the application UI <http://imgbin.org/index.php?page=image&id=20976>, we can see that the shuffle process for *collect* and *reduceByKey* occupied small bandwidth (766.4KB for *collect*'s shuffle read and 961KB for *reduceByKey*'s shuffle write). *However, the shuffle process occupied quite large bandwith when I added partitionBy like this:* val text = sc.textFile(inputDir) val tuples = text.flatMap(line => line.split("#")) .map((_, 1)) .partitionBy(new HashPartitioner(100)) .reduceByKey(_ + _) tuples.collect.foreach{ case (word, count) => println(word + " -> " + count)} >From the application UI <http://imgbin.org/index.php?page=image&id=20977>, we can see that the shuffle read for *collect* is 2.8GB and the shuffle write for *map* is 3.5GB. The *map* transformations are applied on 5 nodes of the cluster because the HDFS blocks are distributed among these 5 nodes. The *map* transformations are applied for each element in the rdd on different nodes and doesn't need shuffle the new rdd. *So my first question is : why did the map transformation occupy so large bandwidth(3.5GB) when I added partitionBy in the codes ?* When *collect* is applied, is needs to collect the results, namely (*word*, *totalCount*) tuples from 5 nodes to the driver. That process should occupy very small bandwidth because all lines have the same contents like "AAA#BBB#CCC#DDD", which means the final results the *collect* retrieved should be very small in bytes(for example hundreds of KB). *So my second question is : Why did the collect action occupy so large bandwidth(2.8GB) when I added partitionByKey in the codes ?* *And the third question : When I added partitionBy for an rdd, it will return a new rdd. Does that mean the rdd will be immediately shuffled across nodes to meet the requirement specified by the supplied partitioner, or will the supplied partitioner merely be a sign indicating how to partition the rdd later. * Thanks.