Re: Shuffle Spill Issue
Whoops, you are right. Sorry for the misinformation. Indeed reduceByKey just calls combineByKey: def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } (I think I confused reduceByKey with groupByKey.) On Wed, Apr 30, 2014 at 2:55 AM, Liu, Raymond wrote: > Hi Daniel > > Thanks for your reply, While I think for reduceByKey, it will also > do map side combine, thus extra the result is the same, say, for each > partition, one entry per distinct word. In my case with javaserializer, > 240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill ( > memory ) is abnormal, and sounds to me should not trigger at all. And, by > the way, this behavior only occurs in map out side, on reduce / shuffle > fetch side, this strange behavior won't happen. > > Best Regards, > Raymond Liu > > From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] > > I have no idea why shuffle spill is so large. But this might make it > smaller: > > val addition = (a: Int, b: Int) => a + b > val wordsCount = wordsPair.combineByKey(identity, addition, addition) > > This way only one entry per distinct word will end up in the shuffle for > each partition, instead of one entry per word occurrence. > > On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond > wrote: > Hi Patrick > > I am just doing simple word count , the data is generated by > hadoop random text writer. > > This seems to me not quite related to compress , If I turn off > compress on shuffle, the metrics is something like below for the smaller > 240MB Dataset. > > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 10 sr437:48527 35 s8 0 8 0.0 B 2.5 MB > 2.2 GB 1291.2 KB > 12 sr437:46077 34 s8 0 8 0.0 B 2.5 MB > 1822.6 MB 1073.3 KB > 13 sr434:37896 31 s8 0 8 0.0 B 2.4 MB > 1099.2 MB 621.2 KB > 15 sr438:52819 31 s8 0 8 0.0 B 2.5 MB > 1898.8 MB 1072.6 KB > 16 sr434:37103 32 s8 0 8 0.0 B 2.4 MB > 1638.0 MB 1044.6 KB > > > And the program pretty simple: > > val files = sc.textFile(args(1)) > val words = files.flatMap(_.split(" ")) > val wordsPair = words.map(x => (x, 1)) > > val wordsCount = wordsPair.reduceByKey(_ + _) > val count = wordsCount.count() > > println("Number of words = " + count) > > > Best Regards, > Raymond Liu > > From: Patrick Wendell [mailto:pwend...@gmail.com] > > Could you explain more what your job is doing and what data types you are > using? These numbers alone don't necessarily indicate something is wrong. > The relationship between the in-memory and on-disk shuffle amount is > definitely a bit strange, the data gets compressed when written to disk, > but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it > to compress _that_ much. > > On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond > wrote: > Hi > > > I am running a simple word count program on spark standalone > cluster. The cluster is made up of 6 node, each run 4 worker and each > worker own 10G memory and 16 core thus total 96 core and 240G memory. ( > well, also used to configed as 1 worker with 40G memory on each node ) > > I run a very small data set (2.4GB on HDFS on total) to confirm > the problem here as below: > > As you can read from part of the task metrics as below, I noticed > that the shuffle spill part of metrics indicate that there are something > wrong. > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:42139 29 s4 0 4 0.0 B 4.3 MB > 23.6 GB 4.3 MB > 1 sr433:46935 1.1 min 4 0 4 0.0 B 4.2 MB > 19.0 GB 3.4 MB > 10 sr436:53277 26 s4 0 4 0.0 B 4.3 MB > 25.6 GB 4.6 MB > 11 sr437:58872 32 s4 0 4 0.0 B 4.3 MB > 25.0 GB 4.4 MB > 12 sr435:48358 27 s4 0 4 0.0 B 4.3 MB > 25.1 GB 4.4 MB > > > You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x > of the actual shuffle data and Shuffle Spill (Disk), and also it seems to > me that by no means that the spill should trigger, since the memory is not > used up at all. > > To verify that I further reduce the data size to 240MB on total > > And here is the result: > > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:50895 15 s4 0 4 0.0 B 703.0 KB > 80.0 MB 43.2 KB > 1 sr433:50207 17 s4
RE: Shuffle Spill Issue
Hi Daniel Thanks for your reply, While I think for reduceByKey, it will also do map side combine, thus extra the result is the same, say, for each partition, one entry per distinct word. In my case with javaserializer, 240MB dataset yield to around 70MB shuffle data. Only that shuffle Spill ( memory ) is abnormal, and sounds to me should not trigger at all. And, by the way, this behavior only occurs in map out side, on reduce / shuffle fetch side, this strange behavior won't happen. Best Regards, Raymond Liu From: Daniel Darabos [mailto:daniel.dara...@lynxanalytics.com] I have no idea why shuffle spill is so large. But this might make it smaller: val addition = (a: Int, b: Int) => a + b val wordsCount = wordsPair.combineByKey(identity, addition, addition) This way only one entry per distinct word will end up in the shuffle for each partition, instead of one entry per word occurrence. On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond wrote: Hi Patrick I am just doing simple word count , the data is generated by hadoop random text writer. This seems to me not quite related to compress , If I turn off compress on shuffle, the metrics is something like below for the smaller 240MB Dataset. Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 10 sr437:48527 35 s 8 0 8 0.0 B 2.5 MB 2.2 GB 1291.2 KB 12 sr437:46077 34 s 8 0 8 0.0 B 2.5 MB 1822.6 MB 1073.3 KB 13 sr434:37896 31 s 8 0 8 0.0 B 2.4 MB 1099.2 MB 621.2 KB 15 sr438:52819 31 s 8 0 8 0.0 B 2.5 MB 1898.8 MB 1072.6 KB 16 sr434:37103 32 s 8 0 8 0.0 B 2.4 MB 1638.0 MB 1044.6 KB And the program pretty simple: val files = sc.textFile(args(1)) val words = files.flatMap(_.split(" ")) val wordsPair = words.map(x => (x, 1)) val wordsCount = wordsPair.reduceByKey(_ + _) val count = wordsCount.count() println("Number of words = " + count) Best Regards, Raymond Liu From: Patrick Wendell [mailto:pwend...@gmail.com] Could you explain more what your job is doing and what data types you are using? These numbers alone don't necessarily indicate something is wrong. The relationship between the in-memory and on-disk shuffle amount is definitely a bit strange, the data gets compressed when written to disk, but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much. On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond wrote: Hi I am running a simple word count program on spark standalone cluster. The cluster is made up of 6 node, each run 4 worker and each worker own 10G memory and 16 core thus total 96 core and 240G memory. ( well, also used to configed as 1 worker with 40G memory on each node ) I run a very small data set (2.4GB on HDFS on total) to confirm the problem here as below: As you can read from part of the task metrics as below, I noticed that the shuffle spill part of metrics indicate that there are something wrong. Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 sr437:42139 29 s 4 0 4 0.0 B 4.3 MB 23.6 GB 4.3 MB 1 sr433:46935 1.1 min 4 0 4 0.0 B 4.2 MB 19.0 GB 3.4 MB 10 sr436:53277 26 s 4 0 4 0.0 B 4.3 MB 25.6 GB 4.6 MB 11 sr437:58872 32 s 4 0 4 0.0 B 4.3 MB 25.0 GB 4.4 MB 12 sr435:48358 27 s 4 0 4 0.0 B 4.3 MB 25.1 GB 4.4 MB You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by no means that the spill should trigger, since the memory is not used up at all. To verify that I further reduce the data size to 240MB on total And here is the result: Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 sr437:50895 15 s 4 0 4 0.0 B 703.0 KB 80.0 MB 43.2 KB 1 sr433:50207 17 s 4 0 4 0.0 B 704.7 KB 389.5 MB 90.2 KB 10 sr436:56352 16 s 4 0 4 0.0 B 700.9 KB 814.9 MB 181.6 KB 11 sr437:53099 15 s 4 0 4 0.0 B 689.7 KB 0.0 B 0.0 B 12 sr435:48318 15 s 4 0 4 0.0 B 702.1 KB 427.4 MB 90.7 KB 13 sr433:59294 17 s 4 0 4 0.0 B 704.8 KB 779.9 MB 180.3 KB Nothing prevent spill from h
Re: Shuffle Spill Issue
I have no idea why shuffle spill is so large. But this might make it smaller: val addition = (a: Int, b: Int) => a + b val wordsCount = wordsPair.combineByKey(identity, addition, addition) This way only one entry per distinct word will end up in the shuffle for each partition, instead of one entry per word occurrence. On Tue, Apr 29, 2014 at 7:48 AM, Liu, Raymond wrote: > Hi Patrick > > I am just doing simple word count , the data is generated by > hadoop random text writer. > > This seems to me not quite related to compress , If I turn off > compress on shuffle, the metrics is something like below for the smaller > 240MB Dataset. > > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 10 sr437:48527 35 s8 0 8 0.0 B 2.5 MB > 2.2 GB 1291.2 KB > 12 sr437:46077 34 s8 0 8 0.0 B 2.5 MB > 1822.6 MB 1073.3 KB > 13 sr434:37896 31 s8 0 8 0.0 B 2.4 MB > 1099.2 MB 621.2 KB > 15 sr438:52819 31 s8 0 8 0.0 B 2.5 MB > 1898.8 MB 1072.6 KB > 16 sr434:37103 32 s8 0 8 0.0 B 2.4 MB > 1638.0 MB 1044.6 KB > > > And the program pretty simple: > > val files = sc.textFile(args(1)) > val words = files.flatMap(_.split(" ")) > val wordsPair = words.map(x => (x, 1)) > > val wordsCount = wordsPair.reduceByKey(_ + _) > val count = wordsCount.count() > > println("Number of words = " + count) > > > Best Regards, > Raymond Liu > > From: Patrick Wendell [mailto:pwend...@gmail.com] > > Could you explain more what your job is doing and what data types you are > using? These numbers alone don't necessarily indicate something is wrong. > The relationship between the in-memory and on-disk shuffle amount is > definitely a bit strange, the data gets compressed when written to disk, > but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it > to compress _that_ much. > > On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond > wrote: > Hi > > > I am running a simple word count program on spark standalone > cluster. The cluster is made up of 6 node, each run 4 worker and each > worker own 10G memory and 16 core thus total 96 core and 240G memory. ( > well, also used to configed as 1 worker with 40G memory on each node ) > > I run a very small data set (2.4GB on HDFS on total) to confirm > the problem here as below: > > As you can read from part of the task metrics as below, I noticed > that the shuffle spill part of metrics indicate that there are something > wrong. > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:42139 29 s4 0 4 0.0 B 4.3 MB > 23.6 GB 4.3 MB > 1 sr433:46935 1.1 min 4 0 4 0.0 B 4.2 MB > 19.0 GB 3.4 MB > 10 sr436:53277 26 s4 0 4 0.0 B 4.3 MB > 25.6 GB 4.6 MB > 11 sr437:58872 32 s4 0 4 0.0 B 4.3 MB > 25.0 GB 4.4 MB > 12 sr435:48358 27 s4 0 4 0.0 B 4.3 MB > 25.1 GB 4.4 MB > > > You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x > of the actual shuffle data and Shuffle Spill (Disk), and also it seems to > me that by no means that the spill should trigger, since the memory is not > used up at all. > > To verify that I further reduce the data size to 240MB on total > > And here is the result: > > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:50895 15 s4 0 4 0.0 B 703.0 KB > 80.0 MB 43.2 KB > 1 sr433:50207 17 s4 0 4 0.0 B 704.7 KB > 389.5 MB90.2 KB > 10 sr436:56352 16 s4 0 4 0.0 B 700.9 KB > 814.9 MB181.6 KB > 11 sr437:53099 15 s4 0 4 0.0 B 689.7 KB > 0.0 B 0.0 B > 12 sr435:48318 15 s4 0 4 0.0 B 702.1 KB > 427.4 MB90.7 KB > 13 sr433:59294 17 s4 0 4 0.0 B 704.8 KB > 779.9 MB180.3 KB > > Nothing prevent spill from happening. > > Now, there seems to me that there must be something wrong with the spill > trigger codes. > > So anyone encounter this issue? By the way, I am using latest trunk code. > > > Best Regards, > Raymond Liu > >
RE: Shuffle Spill Issue
Hi Patrick I am just doing simple word count , the data is generated by hadoop random text writer. This seems to me not quite related to compress , If I turn off compress on shuffle, the metrics is something like below for the smaller 240MB Dataset. Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 10 sr437:48527 35 s8 0 8 0.0 B 2.5 MB 2.2 GB 1291.2 KB 12 sr437:46077 34 s8 0 8 0.0 B 2.5 MB 1822.6 MB 1073.3 KB 13 sr434:37896 31 s8 0 8 0.0 B 2.4 MB 1099.2 MB 621.2 KB 15 sr438:52819 31 s8 0 8 0.0 B 2.5 MB 1898.8 MB 1072.6 KB 16 sr434:37103 32 s8 0 8 0.0 B 2.4 MB 1638.0 MB 1044.6 KB And the program pretty simple: val files = sc.textFile(args(1)) val words = files.flatMap(_.split(" ")) val wordsPair = words.map(x => (x, 1)) val wordsCount = wordsPair.reduceByKey(_ + _) val count = wordsCount.count() println("Number of words = " + count) Best Regards, Raymond Liu From: Patrick Wendell [mailto:pwend...@gmail.com] Could you explain more what your job is doing and what data types you are using? These numbers alone don't necessarily indicate something is wrong. The relationship between the in-memory and on-disk shuffle amount is definitely a bit strange, the data gets compressed when written to disk, but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much. On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond wrote: Hi I am running a simple word count program on spark standalone cluster. The cluster is made up of 6 node, each run 4 worker and each worker own 10G memory and 16 core thus total 96 core and 240G memory. ( well, also used to configed as 1 worker with 40G memory on each node ) I run a very small data set (2.4GB on HDFS on total) to confirm the problem here as below: As you can read from part of the task metrics as below, I noticed that the shuffle spill part of metrics indicate that there are something wrong. Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 sr437:42139 29 s 4 0 4 0.0 B 4.3 MB 23.6 GB 4.3 MB 1 sr433:46935 1.1 min 4 0 4 0.0 B 4.2 MB 19.0 GB 3.4 MB 10 sr436:53277 26 s 4 0 4 0.0 B 4.3 MB 25.6 GB 4.6 MB 11 sr437:58872 32 s 4 0 4 0.0 B 4.3 MB 25.0 GB 4.4 MB 12 sr435:48358 27 s 4 0 4 0.0 B 4.3 MB 25.1 GB 4.4 MB You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x of the actual shuffle data and Shuffle Spill (Disk), and also it seems to me that by no means that the spill should trigger, since the memory is not used up at all. To verify that I further reduce the data size to 240MB on total And here is the result: Executor ID Address Task Time Total Tasks Failed Tasks Succeeded Tasks Shuffle Read Shuffle Write Shuffle Spill (Memory) Shuffle Spill (Disk) 0 sr437:50895 15 s 4 0 4 0.0 B 703.0 KB 80.0 MB 43.2 KB 1 sr433:50207 17 s 4 0 4 0.0 B 704.7 KB 389.5 MB 90.2 KB 10 sr436:56352 16 s 4 0 4 0.0 B 700.9 KB 814.9 MB 181.6 KB 11 sr437:53099 15 s 4 0 4 0.0 B 689.7 KB 0.0 B 0.0 B 12 sr435:48318 15 s 4 0 4 0.0 B 702.1 KB 427.4 MB 90.7 KB 13 sr433:59294 17 s 4 0 4 0.0 B 704.8 KB 779.9 MB 180.3 KB Nothing prevent spill from happening. Now, there seems to me that there must be something wrong with the spill trigger codes. So anyone encounter this issue? By the way, I am using latest trunk code. Best Regards, Raymond Liu
Re: Shuffle Spill Issue
Could you explain more what your job is doing and what data types you are using? These numbers alone don't necessarily indicate something is wrong. The relationship between the in-memory and on-disk shuffle amount is definitely a bit strange, the data gets compressed when written to disk, but unless you have a weird dataset (E.g. all zeros) I wouldn't expect it to compress _that_ much. On Mon, Apr 28, 2014 at 1:18 AM, Liu, Raymond wrote: > Hi > > > I am running a simple word count program on spark standalone > cluster. The cluster is made up of 6 node, each run 4 worker and each > worker own 10G memory and 16 core thus total 96 core and 240G memory. ( > well, also used to configed as 1 worker with 40G memory on each node ) > > I run a very small data set (2.4GB on HDFS on total) to confirm > the problem here as below: > > As you can read from part of the task metrics as below, I noticed > that the shuffle spill part of metrics indicate that there are something > wrong. > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:42139 29 s4 0 4 0.0 B 4.3 MB > 23.6 GB 4.3 MB > 1 sr433:46935 1.1 min 4 0 4 0.0 B 4.2 MB > 19.0 GB 3.4 MB > 10 sr436:53277 26 s4 0 4 0.0 B 4.3 MB > 25.6 GB 4.6 MB > 11 sr437:58872 32 s4 0 4 0.0 B 4.3 MB > 25.0 GB 4.4 MB > 12 sr435:48358 27 s4 0 4 0.0 B 4.3 MB > 25.1 GB 4.4 MB > > > You can see that the Shuffle Spill (Memory) is pretty high, almost 5000x > of the actual shuffle data and Shuffle Spill (Disk), and also it seems to > me that by no means that the spill should trigger, since the memory is not > used up at all. > > To verify that I further reduce the data size to 240MB on total > > And here is the result: > > > Executor ID Address Task Time Total Tasks Failed Tasks > Succeeded Tasks Shuffle ReadShuffle Write Shuffle Spill (Memory) > Shuffle Spill (Disk) > 0 sr437:50895 15 s4 0 4 0.0 B 703.0 KB > 80.0 MB 43.2 KB > 1 sr433:50207 17 s4 0 4 0.0 B 704.7 KB > 389.5 MB90.2 KB > 10 sr436:56352 16 s4 0 4 0.0 B 700.9 KB > 814.9 MB181.6 KB > 11 sr437:53099 15 s4 0 4 0.0 B 689.7 KB > 0.0 B 0.0 B > 12 sr435:48318 15 s4 0 4 0.0 B 702.1 KB > 427.4 MB90.7 KB > 13 sr433:59294 17 s4 0 4 0.0 B 704.8 KB > 779.9 MB180.3 KB > > Nothing prevent spill from happening. > > Now, there seems to me that there must be something wrong with the spill > trigger codes. > > So anyone encounter this issue? By the way, I am using latest trunk code. > > > Best Regards, > Raymond Liu >