Re: Shuffle Spill Issue

2014-04-30 Thread Daniel Darabos
Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey
just calls combineByKey:

def reduceByKey(partitioner: Partitioner, func: (V, V) = V): RDD[(K, V)] =
{
  combineByKey[V]((v: V) = v, func, func, partitioner)
}

(I think I confused reduceByKey with groupByKey.)


On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond raymond@intel.com wrote:

 Hi Daniel

 Thanks for your reply, While I think for reduceByKey, it will also
 do map side combine, thus extra the result is the same, say, for each
 partition, one entry per distinct word. In my case with javaserializer,
  240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill (
 memory ) is abnormal, and sounds to me should not trigger at all. And, by
 the way, this behavior only occurs in map out side, on reduce / shuffle
 fetch side, this strange behavior won't happen.

 Best Regards,
 Raymond Liu

 From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com]

 I have no idea why shuffle spill is so large. But this might make it
 smaller:

 val addition = (a: Int, b: Int) = a + b
 val wordsCount = wordsPair.combineByKey(identity, addition, addition)

 This way only one entry per distinct word will end up in the shuffle for
 each partition, instead of one entry per word occurrence.

 On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond raymond@intel.com
 wrote:
 Hi  Patrick

 I am just doing simple word count , the data is generated by
 hadoop random text writer.

 This seems to me not quite related to compress , If I turn off
 compress on shuffle, the metrics is something like below for the smaller
 240MB Dataset.


 Executor ID Address Task Time   Total Tasks Failed Tasks
  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
  Shuffle Spill (Disk)
 10  sr437:48527 35 s8   0   8   0.0 B   2.5 MB
  2.2 GB  1291.2 KB
 12  sr437:46077 34 s8   0   8   0.0 B   2.5 MB
  1822.6 MB   1073.3 KB
 13  sr434:37896 31 s8   0   8   0.0 B   2.4 MB
  1099.2 MB   621.2 KB
 15  sr438:52819 31 s8   0   8   0.0 B   2.5 MB
  1898.8 MB   1072.6 KB
 16  sr434:37103 32 s8   0   8   0.0 B   2.4 MB
  1638.0 MB   1044.6 KB


 And the program pretty simple:

 val files = sc.textFile(args(1))
 val words = files.flatMap(_.split( ))
 val wordsPair = words.map(x = (x, 1))

 val wordsCount = wordsPair.reduceByKey(_ + _)
 val count = wordsCount.count()

 println(Number of words =  + count)


 Best Regards,
 Raymond Liu

 From: Patrick Wendell [mailto:pwend...@gmail.com]

 Could you explain more what your job is doing and what data types you are
 using? These numbers alone don't necessarily indicate something is wrong.
 The relationship between the in-memory and on-disk shuffle amount is
 definitely a bit strange, the data gets compressed when written to disk,
 but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it
 to compress _that_ much.

 On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond raymond@intel.com
 wrote:
 Hi


 I am running a simple word count program on spark standalone
 cluster. The cluster is made up of 6 node, each run 4 worker and each
 worker own 10G memory and 16 core thus total 96 core and 240G memory. (
 well, also used to configed as 1 worker with 40G memory on each node )

 I run a very small data set (2.4GB on HDFS on total) to confirm
 the problem here as below:

 As you can read from part of the task metrics as below, I noticed
 that the shuffle spill part of metrics indicate that there are something
 wrong.

 Executor ID Address Task Time   Total Tasks Failed Tasks
  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
  Shuffle Spill (Disk)
 0   sr437:42139 29 s4   0   4   0.0 B   4.3 MB
  23.6 GB 4.3 MB
 1   sr433:46935 1.1 min 4   0   4   0.0 B   4.2 MB
  19.0 GB 3.4 MB
 10  sr436:53277 26 s4   0   4   0.0 B   4.3 MB
  25.6 GB 4.6 MB
 11  sr437:58872 32 s4   0   4   0.0 B   4.3 MB
  25.0 GB 4.4 MB
 12  sr435:48358 27 s4   0   4   0.0 B   4.3 MB
  25.1 GB 4.4 MB


 You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x
 of the actual shuffle data and Shuffle Spill (Disk), and also it seems to
 me that by no means that the spill should trigger, since the memory is not
 used up at all.

 To verify that I further reduce the data size to 240MB on total

 And here is the result:


 Executor ID Address Task Time   Total Tasks Failed Tasks
  Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)
  Shuffle Spill (Disk)
 0   sr437:50895 15 s4   0   4   0.0 B   703.0 KB
  80.0 MB 43.2 KB
 1   sr433:50207 17 s4   0   4   0.0 B   704.7 KB
  389.5 MB90.2 KB
 

RE: Shuffle Spill Issue

2014-04-29 Thread Liu, Raymond
Hi Daniel

Thanks for your reply, While I think for reduceByKey, it will also do 
map side combine, thus extra the result is the same, say, for each partition, 
one entry per distinct word. In my case with javaserializer,  240MB dataset 
yield to around 70MB shuffle data. Only that shuffle Spill ( memory ) is 
abnormal, and sounds to me should not trigger at all. And, by the way, this 
behavior only occurs in map out side, on reduce / shuffle fetch side, this 
strange behavior won't happen.

Best Regards,
Raymond Liu

From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] 

I have no idea why shuffle spill is so large. But this might make it smaller:

val addition = (a: Int, b: Int) = a + b
val wordsCount = wordsPair.combineByKey(identity, addition, addition)

This way only one entry per distinct word will end up in the shuffle for each 
partition, instead of one entry per word occurrence.

On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond raymond@intel.com wrote:
Hi  Patrick

        I am just doing simple word count , the data is generated by hadoop 
random text writer.

        This seems to me not quite related to compress , If I turn off compress 
on shuffle, the metrics is something like below for the smaller 240MB Dataset.


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
10      sr437:48527     35 s    8       0       8       0.0 B   2.5 MB  2.2 GB  
1291.2 KB
12      sr437:46077     34 s    8       0       8       0.0 B   2.5 MB  1822.6 
MB       1073.3 KB
13      sr434:37896     31 s    8       0       8       0.0 B   2.4 MB  1099.2 
MB       621.2 KB
15      sr438:52819     31 s    8       0       8       0.0 B   2.5 MB  1898.8 
MB       1072.6 KB
16      sr434:37103     32 s    8       0       8       0.0 B   2.4 MB  1638.0 
MB       1044.6 KB


        And the program pretty simple:

val files = sc.textFile(args(1))
val words = files.flatMap(_.split( ))
val wordsPair = words.map(x = (x, 1))

val wordsCount = wordsPair.reduceByKey(_ + _)
val count = wordsCount.count()

println(Number of words =  + count)


Best Regards,
Raymond Liu

From: Patrick Wendell [mailto:pwend...@gmail.com]

Could you explain more what your job is doing and what data types you are 
using? These numbers alone don't necessarily indicate something is wrong. The 
relationship between the in-memory and on-disk shuffle amount is definitely a 
bit strange, the data gets compressed when written to disk, but unless you have 
a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much.

On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond raymond@intel.com wrote:
Hi


        I am running a simple word count program on spark standalone cluster. 
The cluster is made up of 6 node, each run 4 worker and each worker own 10G 
memory and 16 core thus total 96 core and 240G memory. ( well, also used to 
configed as 1 worker with 40G memory on each node )

        I run a very small data set (2.4GB on HDFS on total) to confirm the 
problem here as below:

        As you can read from part of the task metrics as below, I noticed that 
the shuffle spill part of metrics indicate that there are something wrong.

Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:42139     29 s    4       0       4       0.0 B   4.3 MB  23.6 GB 
4.3 MB
1       sr433:46935     1.1 min 4       0       4       0.0 B   4.2 MB  19.0 GB 
3.4 MB
10      sr436:53277     26 s    4       0       4       0.0 B   4.3 MB  25.6 GB 
4.6 MB
11      sr437:58872     32 s    4       0       4       0.0 B   4.3 MB  25.0 GB 
4.4 MB
12      sr435:48358     27 s    4       0       4       0.0 B   4.3 MB  25.1 GB 
4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the 
actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by 
no means that the spill should trigger, since the memory is not used up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:50895     15 s    4       0       4       0.0 B   703.0 KB        
80.0 MB 43.2 KB
1       sr433:50207     17 s    4       0       4       0.0 B   704.7 KB        
389.5 MB        90.2 KB
10      sr436:56352     16 s    4       0       4       0.0 B   700.9 KB        
814.9 MB        181.6 KB
11      sr437:53099     15 s    4       0       4       0.0 B   689.7 KB        
0.0 B   0.0 B
12      sr435:48318     15 s    4       0       4       0.0 B   702.1 KB        
427.4 MB        90.7 KB
13      sr433:59294     17 s    4       0       4       0.0 B   704.8 KB        
779.9 MB        

Shuffle Spill Issue

2014-04-28 Thread Liu, Raymond
Hi


I am running a simple word count program on spark standalone cluster. 
The cluster is made up of 6 node, each run 4 worker and each worker own 10G 
memory and 16 core thus total 96 core and 240G memory. ( well, also used to 
configed as 1 worker with 40G memory on each node )

I run a very small data set (2.4GB on HDFS on total) to confirm the 
problem here as below:

As you can read from part of the task metrics as below, I noticed that 
the shuffle spill part of metrics indicate that there are something wrong.

Executor ID Address Task Time   Total Tasks Failed Tasks
Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0   sr437:42139 29 s4   0   4   0.0 B   4.3 MB  23.6 GB 
4.3 MB
1   sr433:46935 1.1 min 4   0   4   0.0 B   4.2 MB  19.0 GB 
3.4 MB
10  sr436:53277 26 s4   0   4   0.0 B   4.3 MB  25.6 GB 
4.6 MB
11  sr437:58872 32 s4   0   4   0.0 B   4.3 MB  25.0 GB 
4.4 MB
12  sr435:48358 27 s4   0   4   0.0 B   4.3 MB  25.1 GB 
4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the 
actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by 
no means that the spill should trigger, since the memory is not used up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID Address Task Time   Total Tasks Failed Tasks
Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0   sr437:50895 15 s4   0   4   0.0 B   703.0 KB
80.0 MB 43.2 KB
1   sr433:50207 17 s4   0   4   0.0 B   704.7 KB
389.5 MB90.2 KB
10  sr436:56352 16 s4   0   4   0.0 B   700.9 KB
814.9 MB181.6 KB
11  sr437:53099 15 s4   0   4   0.0 B   689.7 KB
0.0 B   0.0 B
12  sr435:48318 15 s4   0   4   0.0 B   702.1 KB
427.4 MB90.7 KB
13  sr433:59294 17 s4   0   4   0.0 B   704.8 KB
779.9 MB180.3 KB

Nothing prevent spill from happening.

Now, there seems to me that there must be something wrong with the spill 
trigger codes. 

So anyone encounter this issue?  By the way, I am using latest trunk code.


Best Regards,
Raymond Liu


RE: Shuffle Spill Issue

2014-04-28 Thread Liu, Raymond
Hi  Patrick

I am just doing simple word count , the data is generated by hadoop 
random text writer.

This seems to me not quite related to compress , If I turn off compress 
on shuffle, the metrics is something like below for the smaller 240MB Dataset.


Executor ID Address Task Time   Total Tasks Failed Tasks
Succeeded Tasks Shuffle ReadShuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
10  sr437:48527 35 s8   0   8   0.0 B   2.5 MB  2.2 GB  
1291.2 KB
12  sr437:46077 34 s8   0   8   0.0 B   2.5 MB  1822.6 
MB   1073.3 KB
13  sr434:37896 31 s8   0   8   0.0 B   2.4 MB  1099.2 
MB   621.2 KB
15  sr438:52819 31 s8   0   8   0.0 B   2.5 MB  1898.8 
MB   1072.6 KB
16  sr434:37103 32 s8   0   8   0.0 B   2.4 MB  1638.0 
MB   1044.6 KB


And the program pretty simple:

val files = sc.textFile(args(1))
val words = files.flatMap(_.split( ))
val wordsPair = words.map(x = (x, 1))

val wordsCount = wordsPair.reduceByKey(_ + _)
val count = wordsCount.count()

println(Number of words =  + count)


Best Regards,
Raymond Liu

From: Patrick Wendell [mailto:pwend...@gmail.com] 

Could you explain more what your job is doing and what data types you are 
using? These numbers alone don't necessarily indicate something is wrong. The 
relationship between the in-memory and on-disk shuffle amount is definitely a 
bit strange, the data gets compressed when written to disk, but unless you have 
a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much.

On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond raymond@intel.com wrote:
Hi


        I am running a simple word count program on spark standalone cluster. 
The cluster is made up of 6 node, each run 4 worker and each worker own 10G 
memory and 16 core thus total 96 core and 240G memory. ( well, also used to 
configed as 1 worker with 40G memory on each node )

        I run a very small data set (2.4GB on HDFS on total) to confirm the 
problem here as below:

        As you can read from part of the task metrics as below, I noticed that 
the shuffle spill part of metrics indicate that there are something wrong.

Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:42139     29 s    4       0       4       0.0 B   4.3 MB  23.6 GB 
4.3 MB
1       sr433:46935     1.1 min 4       0       4       0.0 B   4.2 MB  19.0 GB 
3.4 MB
10      sr436:53277     26 s    4       0       4       0.0 B   4.3 MB  25.6 GB 
4.6 MB
11      sr437:58872     32 s    4       0       4       0.0 B   4.3 MB  25.0 GB 
4.4 MB
12      sr435:48358     27 s    4       0       4       0.0 B   4.3 MB  25.1 GB 
4.4 MB


You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the 
actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by 
no means that the spill should trigger, since the memory is not used up at all.

To verify that I further reduce the data size to 240MB on total

And here is the result:


Executor ID     Address Task Time       Total Tasks     Failed Tasks    
Succeeded Tasks Shuffle Read    Shuffle Write   Shuffle Spill (Memory)  Shuffle 
Spill (Disk)
0       sr437:50895     15 s    4       0       4       0.0 B   703.0 KB        
80.0 MB 43.2 KB
1       sr433:50207     17 s    4       0       4       0.0 B   704.7 KB        
389.5 MB        90.2 KB
10      sr436:56352     16 s    4       0       4       0.0 B   700.9 KB        
814.9 MB        181.6 KB
11      sr437:53099     15 s    4       0       4       0.0 B   689.7 KB        
0.0 B   0.0 B
12      sr435:48318     15 s    4       0       4       0.0 B   702.1 KB        
427.4 MB        90.7 KB
13      sr433:59294     17 s    4       0       4       0.0 B   704.8 KB        
779.9 MB        180.3 KB

Nothing prevent spill from happening.

Now, there seems to me that there must be something wrong with the spill 
trigger codes.

So anyone encounter this issue?  By the way, I am using latest trunk code.


Best Regards,
Raymond Liu