These parameters in fact control the behavior on reduce side, as in your word 
count example.
The partitions will be fetched by the reducer which being assigned to it. The 
reducer will fetch corresponding partitions from different mappers output, and 
it will process the data based on your logic while fetching them. This memory 
area is a sortBuffer area, and depending on "spark.shuffle.spill" (for memory 
only or memory + disk), Spark will use different implementations (AppendOnlyMap 
and ExternalAppendOnlyMap) to handle it.
The Spark shuffle memoryFraction is to control what fraction of java heap to 
use as the SortBuffer area.
You can find more information in this Jira:

Thanks Yong,
my script is pretty straight forward - 
sc.textFile("/wc/input").flatMap(line => line.split(" ")).map(word => 
(word,1)).reduceByKey(_+_).saveAsTextFile("/wc/out2") //both paths are HDFS.
so if for every shuffle write , it always writes to disk , what is the meaning 
of these properties -


No problem.
>From the mapper side, Spark is very similar as the MapReduce; but on the 
>reducer fetching side, MR uses sort merge vs Spark uses HashMap.
So keep this in mind that you can get data automatically sorted on the reducer 
side on MR, but not in Spark.
Spark's performance comes:Cache ability and smart arranging the tasks into 
stages. Intermediate data between stages never stored in HDFS, but in local 
disk. In MR jobs, from one MR job to another one, the intermediate data stored 
in HDFS.Spark uses threads to run tasks, instead of heavy process as MR.
Without caching, in my experience, Spark can get about 2x to 5x better than MR 
job, depending on the jog logic. If the data volume is small, Spark will be 
even better, as the processor is way more expensive than the thread in this 
I didn't see your Spark script, so my guess is that you are using 
"rdd.collect()", which will transfer the final result to driver and dump it in 
the console.
Thanks Yong , 
That was a good explanation I was looking for , however I have one doubt , you 
write - "Image that you have 2 mappers to read the data, then each mapper will 
generate the (word, count) tuple output in segments. Spark always output that 
in local file. (In fact, one file with different segments to represent 
different partitions) "  if this is true then spark is very similar to Hadoop 
MapReduce (Disk IO bw phases) , with so many IOs after each stage how does 
spark achieves the performance that it does as compared to map reduce . Another 
doubt is  "The 2000 bytes sent to driver is the final output aggregated on the 
reducers end, and merged back to the driver." , which part of our word count 
code takes care of this part ? And yes there are only 273 distinct words in the 
text so that's not a surprise.
Thanks again,
Hope to get a reply.
I am not sure about originally explain of shuffle write. 
In the word count example, the shuffle is needed, as Spark has to group by the 
word (ReduceBy is more accurate here). Image that you have 2 mappers to read 
the data, then each mapper will generate the (word, count) tuple output in 
segments. Spark always output that in local file. (In fact, one file with 
different segments to represent different partitions).
As you can image, the output of these segments will be small, as it only 
contains (word, count of word) tuples. After each mapper generates this 
segmented file for different partitions, then the reduce will fetch the 
partitions belonging to itself.
In your job summery, if your source is text file, so your data corresponds to 2 
HDFS block, or 2x256M. There are 2 tasks concurrent read these 2 partitions, 
about 2.5M lines of data of each partition being processed.
The output of each partition is shuffle-writing 2.7K data, which is the size of 
the segment file generated, corresponding to all the unique words and their 
count of this partition. So the size is reasonable, at least for me.
The interested number is 273 as shuffle write records. I am not 100% sure its 
meaning. Does it mean that this partition have 273 unique words from these 2.5M 
lines of data? That is kind of low, but I really don't have other explaining of 
its meaning.
If you finally output shows hundreds of unique words, then it is.
The 2000 bytes sent to driver is the final output aggregated on the reducers 
end, and merged back to the driver.

Hi Nicolae,Thanks for the reply. To further clarify things -
sc.textFile is reading from HDFS, now shouldn't the file be read in a way such 
that EACH executer works on only the local copy of file part available , in 
this case its a ~ 4.64 GB file and block size is 256MB, so approx 19 partitions 
will be created and each task will run on  1 partition (which is what I am 
seeing in the stages logs) , also i assume it will read the file in a way that 
each executer will have exactly same amount of data. so there shouldn't be any 
shuffling in reading atleast.
During the stage 0 (sc.textFile -> flatMap -> Map) for every task this is the 
output I am seeing
IndexIDAttemptStatusLocality LevelExecutor ID / HostLaunch TimeDurationGC 
TimeInput Size / RecordsWrite TimeShuffle Write Size / 
RecordsErrors0440SUCCESSNODE_LOCAL1 / 13:57:2414 s0.2 
s256.0 MB (hadoop) / 25951612.7 KB / 2731450SUCCESSNODE_LOCAL2 / 13:57:2413 s0.2 s256.0 MB (hadoop) / 25951762.7 KB / 
273I have following questions -
1) What exactly is 2.7KB of shuffle write  ?2) is this 2.7 KB of shuffle write 
is local to that executer ?3) In the executers log I am seeing 2000 bytes 
results sent to the driver , if instead this number is much much greater than 
2000 byes such that it does not fit in executer's memory , will shuffle write 
increase ?4)For word count 256 MB data is substantial amount text , how come 
the result for this stage is only 2000 bytes !! it should send everyword with 
respective count , for a 256 MB input this result should be much bigger ? 
I hope I am clear this time.
Hope to get a reply,

My understanding is like this:
First step is a number of partitions are determined, p of them. You can give 
hint on this.
Then the nodes which will load partitions p, that is n nodes (where n<=p).

Relatively at the same time or not, the n nodes start opening different 
sections of the file - the physical equivalent of the
partitions: for instance in HDFS they would do an open and a seek I guess and 
just read from the stream there, convert to whatever the InputFormat dictates.

The shuffle can only be the part when a node opens an HDFS file for instance 
but the node does not have a local replica of the blocks which it needs to read 
(those pertaining to his assigned partitions). So he needs to pick them up from 
nodes which do have replicas of that data.

After blocks are read into memory, flatMap and Map are local computations 
generating new RDDs and in the end the result is sent to the driver (whatever 
termination computation does on the RDD like the result of reduce, or side 
effects of rdd.foreach, etc).

Maybe you can share more of your context if still unclear.
I just made assumptions to give clarity on a similar thing.


Thanks Nicolae , 
So In my case all executers are sending results back to the driver and and 
is just sending out the textFile to distribute the partitions", could you 
please elaborate on this  ? what exactly is in this file ?

2- the end results are sent back to the driver; the shuffles are transmission 
of intermediate results between nodes such as the -> which are all intermediate 

More precisely, since flatMap and map are narrow dependencies, meaning they can 
usually happen on the local node, I bet shuffle is just sending out the 
textFile to a few nodes to distribute the partitions.

Hi All,

I tried running spark word count and I have couple of questions - 

I am analyzing stage 0 , i.e 
 sc.textFile -> flatMap -> Map (Word count example)

1) In the Stage logs under Application UI details for every task I am seeing 
Shuffle write as 2.7 KB,
question - how can I know where all did this task write ? like how many bytes 
to which executer ?

2) In the executer's log when I look for same task it says 2000 bytes of result 
is sent to driver , my question is ,
if the results were directly sent to driver what is this shuffle write ? 





