Re: Large Task Size?
It was because of the latest change to task serialization: https://github.com/apache/spark/commit/1efb3698b6cf39a80683b37124d2736ebf3c9d9a The task size is no longer limited by akka.frameSize but we show warning messages if the task size is above 100KB. Please check the objects referenced in the task closure. If there are member variables or functions referenced, it may pull in the entire parent object, which could be large. -Xiangrui On Sat, Jul 19, 2014 at 5:42 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm still having trouble with this one. Watching it, I've noticed that the first time around, the task size is large, but not terrible (199KB). It's on the second iteration of the optimization that the task size goes crazy (120MB). Does anybody have any ideas why this might be happening? Is there anyway that I can view the data being encoded in the task description, so that I might be able to get an idea why it is blowing up? The line in question can be found at: https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L157 From the logs: 14/07/19 17:29:35 WARN scheduler.TaskSetManager: Stage 858 contains a task of very large size (199 KB). The maximum recommended task size is 100 KB. 14/07/19 17:30:22 WARN scheduler.TaskSetManager: Stage 857 contains a task of very large size (185 KB). The maximum recommended task size is 100 KB. 14/07/19 17:30:26 WARN scheduler.TaskSetManager: Stage 1029 contains a task of very large size (185 KB). The maximum recommended task size is 100 KB. 14/07/19 17:30:57 WARN scheduler.TaskSetManager: Stage 1202 contains a task of very large size (123414 KB). The maximum recommended task size is 100 KB. From the web server (connecting the stage number to the line number) Stage Id Description 858 sample at GroupedGradientDescent.scala:157 857 collect at GroupedGradientDescent.scala:183 1029collect at GroupedGradientDescent.scala:194 1202sample at GroupedGradientDescent.scala:157 Kyle On Tue, Jul 15, 2014 at 2:45 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: Yes, this is a proposed patch to MLLib so that you can use 1 RDD to train multiple models at the same time. I am hoping that by multiplexing several models in the same RDD will be more efficient then trying to get the Spark scheduler to manage a few 100 tasks simultaneously. I don't think I see stochasticLossHistory being included in the closure (please correct me if I'm wrong). Its used once on line 183 to capture the loss sums (a local operation on the results of a 'collect' call), and again on line 198 to update weightSet, but that's after the loop completes, and the memory blow definitely happens before then. Kyle On Tue, Jul 15, 2014 at 12:00 PM, Aaron Davidson ilike...@gmail.com wrote: Ah, I didn't realize this was non-MLLib code. Do you mean to be sending stochasticLossHistory in the closure as well? On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: It uses the standard SquaredL2Updater, and I also tried to broadcast it as well. The input is a RDD created by taking the union of several inputs, that have all been run against MLUtils.kFold to produce even more RDDs. If I run with 10 different inputs, each with 10 kFolds. I'm pretty certain that all of the input RDDs have clean closures. But I'm curious, is there a high overhead for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data
Re: Large Task Size?
I'm still having trouble with this one. Watching it, I've noticed that the first time around, the task size is large, but not terrible (199KB). It's on the second iteration of the optimization that the task size goes crazy (120MB). Does anybody have any ideas why this might be happening? Is there anyway that I can view the data being encoded in the task description, so that I might be able to get an idea why it is blowing up? The line in question can be found at: https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L157 From the logs: 14/07/19 17:29:35 WARN scheduler.TaskSetManager: Stage 858 contains a task of very large size (199 KB). The maximum recommended task size is 100 KB. 14/07/19 17:30:22 WARN scheduler.TaskSetManager: Stage 857 contains a task of very large size (185 KB). The maximum recommended task size is 100 KB. 14/07/19 17:30:26 WARN scheduler.TaskSetManager: Stage 1029 contains a task of very large size (185 KB). The maximum recommended task size is 100 KB. 14/07/19 17:30:57 WARN scheduler.TaskSetManager: Stage 1202 contains a task of very large size (123414 KB). The maximum recommended task size is 100 KB. From the web server (connecting the stage number to the line number) Stage Id Description 858 sample at GroupedGradientDescent.scala:157 857 collect at GroupedGradientDescent.scala:183 1029collect at GroupedGradientDescent.scala:194 1202sample at GroupedGradientDescent.scala:157 Kyle On Tue, Jul 15, 2014 at 2:45 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: Yes, this is a proposed patch to MLLib so that you can use 1 RDD to train multiple models at the same time. I am hoping that by multiplexing several models in the same RDD will be more efficient then trying to get the Spark scheduler to manage a few 100 tasks simultaneously. I don't think I see stochasticLossHistory being included in the closure (please correct me if I'm wrong). Its used once on line 183 to capture the loss sums (a local operation on the results of a 'collect' call), and again on line 198 to update weightSet, but that's after the loop completes, and the memory blow definitely happens before then. Kyle On Tue, Jul 15, 2014 at 12:00 PM, Aaron Davidson ilike...@gmail.com wrote: Ah, I didn't realize this was non-MLLib code. Do you mean to be sending stochasticLossHistory in the closure as well? On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: It uses the standard SquaredL2Updater, and I also tried to broadcast it as well. The input is a RDD created by taking the union of several inputs, that have all been run against MLUtils.kFold to produce even more RDDs. If I run with 10 different inputs, each with 10 kFolds. I'm pretty certain that all of the input RDDs have clean closures. But I'm curious, is there a high overhead for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: Large Task Size?
Ah, I didn't realize this was non-MLLib code. Do you mean to be sending stochasticLossHistory in the closure as well? On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: It uses the standard SquaredL2Updater, and I also tried to broadcast it as well. The input is a RDD created by taking the union of several inputs, that have all been run against MLUtils.kFold to produce even more RDDs. If I run with 10 different inputs, each with 10 kFolds. I'm pretty certain that all of the input RDDs have clean closures. But I'm curious, is there a high overhead for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: Large Task Size?
Yes, this is a proposed patch to MLLib so that you can use 1 RDD to train multiple models at the same time. I am hoping that by multiplexing several models in the same RDD will be more efficient then trying to get the Spark scheduler to manage a few 100 tasks simultaneously. I don't think I see stochasticLossHistory being included in the closure (please correct me if I'm wrong). Its used once on line 183 to capture the loss sums (a local operation on the results of a 'collect' call), and again on line 198 to update weightSet, but that's after the loop completes, and the memory blow definitely happens before then. Kyle On Tue, Jul 15, 2014 at 12:00 PM, Aaron Davidson ilike...@gmail.com wrote: Ah, I didn't realize this was non-MLLib code. Do you mean to be sending stochasticLossHistory in the closure as well? On Sun, Jul 13, 2014 at 1:05 AM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: It uses the standard SquaredL2Updater, and I also tried to broadcast it as well. The input is a RDD created by taking the union of several inputs, that have all been run against MLUtils.kFold to produce even more RDDs. If I run with 10 different inputs, each with 10 kFolds. I'm pretty certain that all of the input RDDs have clean closures. But I'm curious, is there a high overhead for running union? Could that create larger task sizes? Kyle On Sat, Jul 12, 2014 at 7:50 PM, Aaron Davidson ilike...@gmail.com wrote: I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Large Task Size?
I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle
Re: Large Task Size?
I also did a quick glance through the code and couldn't find anything worrying that should be included in the task closures. The only possibly unsanitary part is the Updater you pass in -- what is your Updater and is it possible it's dragging in a significant amount of extra state? On Sat, Jul 12, 2014 at 7:27 PM, Kyle Ellrott kellr...@soe.ucsc.edu wrote: I'm working of a patch to MLLib that allows for multiplexing several different model optimization using the same RDD ( SPARK-2372: https://issues.apache.org/jira/browse/SPARK-2372 ) In testing larger datasets, I've started to see some memory errors ( java.lang.OutOfMemoryError and exceeds max allowed: spark.akka.frameSize errors ). My main clue is that Spark will start logging warning on smaller systems like: 14/07/12 19:14:46 WARN scheduler.TaskSetManager: Stage 2862 contains a task of very large size (10119 KB). The maximum recommended task size is 100 KB. Looking up start '2862' in the case leads to a 'sample at GroupedGradientDescent.scala:156' call. That code can be seen at https://github.com/kellrott/spark/blob/mllib-grouped/mllib/src/main/scala/org/apache/spark/mllib/grouped/GroupedGradientDescent.scala#L156 I've looked over the code, I'm broadcasting the larger variables, and between the sampler and the combineByKey, I wouldn't think there much data being moved over the network, much less a 10MB chunk. Any ideas of what this might be a symptom of? Kyle