Hi Daniel

        Thanks for your reply, While I think for reduceByKey, it will also do 
map side combine, thus extra the result is the same, say, for each partition, 
one entry per distinct word. In my case with javaserializer,  240MB dataset 
yield to around 70MB shuffle data. Only that shuffle Spill ( memory ) is 
abnormal, and sounds to me should not trigger at all. And, by the way, this 
behavior only occurs in map out side, on reduce / shuffle fetch side, this 
strange behavior won't happen.

Best Regards,
Raymond Liu

From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] 

I have no idea why shuffle spill is so large. But this might make it smaller:

val addition = (a: Int, b: Int) => a + b
val wordsCount = wordsPair.combineByKey(identity, addition, addition)

This way only one entry per distinct word will end up in the shuffle for each 
partition, instead of one entry per word occurrence.

On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond <raymond....@intel.com> wrote:
Hi  Patrick

        I am just doing simple word count , the data is generated by hadoop 
random text writer.

        This seems to me not quite related to compress , If I turn off compress 
on shuffle, the metrics is something like below for the smaller 240MB Dataset.


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
10      sr437:48527     35 s    8       0       8       0.0 B   2.5 MB  2.2 GB  
1291.2 KB
12      sr437:46077     34 s    8       0       8       0.0 B   2.5 MB  1822.6 
MB       1073.3 KB
13      sr434:37896     31 s    8       0       8       0.0 B   2.4 MB  1099.2 
MB       621.2 KB
15      sr438:52819     31 s    8       0       8       0.0 B   2.5 MB  1898.8 
MB       1072.6 KB
16      sr434:37103     32 s    8       0       8       0.0 B   2.4 MB  1638.0 
MB       1044.6 KB


        And the program pretty simple:

val files = sc.textFile(args(1))
val words = files.flatMap(_.split(" "))
val wordsPair = words.map(x => (x, 1))

val wordsCount = wordsPair.reduceByKey(_ + _)
val count = wordsCount.count()

println("Number of words = " + count)


Best Regards,
Raymond Liu

From: Patrick Wendell [mailto:pwend...@gmail.com]

Could you explain more what your job is doing and what data types you are 
using? These numbers alone don't necessarily indicate something is wrong. The 
relationship between the in-memory and on-disk shuffle amount is definitely a 
bit strange, the data gets compressed when written to disk, but unless you have 
a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much.

On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond <raymond....@intel.com> wrote:
Hi


        I am running a simple word count program on spark standalone cluster. 
The cluster is made up of 6 node, each run 4 worker and each worker own 10G 
memory and 16 core thus total 96 core and 240G memory. ( well, also used to 
configed as 1 worker with 40G memory on each node )

        I run a very small data set (2.4GB on HDFS on total) to confirm the 
problem here as below:

        As you can read from part of the task metrics as below, I noticed that 
the shuffle spill part of metrics indicate that there are something wrong.

Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:42139     29 s    4       0       4       0.0 B   4.3 MB  23.6 GB 
4.3 MB
1       sr433:46935     1.1 min 4       0       4       0.0 B   4.2 MB  19.0 GB 
3.4 MB
10      sr436:53277     26 s    4       0       4       0.0 B   4.3 MB  25.6 GB 
4.6 MB
11      sr437:58872     32 s    4       0       4       0.0 B   4.3 MB  25.0 GB 
4.4 MB
12      sr435:48358     27 s    4       0       4       0.0 B   4.3 MB  25.1 GB 
4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the 
actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by 
no means that the spill should trigger, since the memory is not used up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:50895     15 s    4       0       4       0.0 B   703.0 KB        
80.0 MB 43.2 KB
1       sr433:50207     17 s    4       0       4       0.0 B   704.7 KB        
389.5 MB        90.2 KB
10      sr436:56352     16 s    4       0       4       0.0 B   700.9 KB        
814.9 MB        181.6 KB
11      sr437:53099     15 s    4       0       4       0.0 B   689.7 KB        
0.0 B   0.0 B
12      sr435:48318     15 s    4       0       4       0.0 B   702.1 KB        
427.4 MB        90.7 KB
13      sr433:59294     17 s    4       0       4       0.0 B   704.8 KB        
779.9 MB        180.3 KB

Nothing prevent spill from happening.

Now, there seems to me that there must be something wrong with the spill 
trigger codes.

So anyone encounter this issue?  By the way, I am using latest trunk code.


Best Regards,
Raymond Liu

Reply via email to