Re: LogisticRegressionWithLBFGS with large feature set
For ML applications, the best setting to set the number of partitions to match the number of cores to reduce shuffle size. You have 3072 partitions but 128 executors, which causes the overhead. For the MultivariateOnlineSummarizer, we plan to add flags to specify what need to be computed to reduce the overhead, in 1.5. -Xiangrui On Mon, May 18, 2015 at 7:00 PM, Imran Rashid iras...@cloudera.com wrote: I'm not super familiar with this part of the code, but from taking a quick look: a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles per feature (mean, max, min, etc. etc.) b) The limit is on the result size from *all* tasks, not from one task. You start with 3072 tasks c) tree aggregate should first merge things down to about 8 partitions before bringing results back to the driver, which is how you end up with 54 tasks at your failure. this means you should have about 30 MB / per task per meaure * 54 tasks * 7 measures, which comes to about 11GB, or in the ballpark of what you found. In principle, you could get this working by adding more levels to the treeAggregate (the depth parameter), but looks like that isn't exposed. You could also try coalescing your data down to a smaller set of partitions first, but that comes with other downsides. Perhaps an MLLib expert could chime in on an alternate approach. My feeling (from a very quick look) is that there is room for some optimization in the internals Imran On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am trying to validate our modeling data pipeline by running LogisticRegressionWithLBFGS on a dataset with ~3.7 million features, basically to compute AUC. This is on Spark 1.3.0. I am using 128 executors with 4 GB each + driver with 8 GB. The number of data partitions is 3072 The execution fails with the following messages: Total size of serialized results of 54 tasks (10.4 GB) is bigger than spark.driver.maxResultSize (3.0 GB) The associated stage in the job is treeAggregate at StandardScaler.scala:52 : The call stack looks as below: org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996) org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) I am trying to both understand why such large amount of data needs to be passed back to driver as well as figure out a way around this. I also want to understand how much memory is required, as a function of dataset size, feature set size, and number of iterations performed, for future experiments. From looking at the MLLib code, the largest data structure seems to be a dense vector of the same size as feature set. I am not familiar with algorithm or its implementation I would guess 3.7 million features would lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset size become so large? I looked into the treeAggregate and it looks like hierarchical aggregation. If the data being sent to the driver is basically the aggregated coefficients (i.e. dense vectors) for the final aggregation, can't the dense vectors from executors be pulled in one at a time and merged in memory, rather than pulling all of them in together? (This is totally uneducated guess so i may be completely off here). Is there a way to get this running? Thanks, pala - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: LogisticRegressionWithLBFGS with large feature set
I'm not super familiar with this part of the code, but from taking a quick look: a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles per feature (mean, max, min, etc. etc.) b) The limit is on the result size from *all* tasks, not from one task. You start with 3072 tasks c) tree aggregate should first merge things down to about 8 partitions before bringing results back to the driver, which is how you end up with 54 tasks at your failure. this means you should have about 30 MB / per task per meaure * 54 tasks * 7 measures, which comes to about 11GB, or in the ballpark of what you found. In principle, you could get this working by adding more levels to the treeAggregate (the depth parameter), but looks like that isn't exposed. You could also try coalescing your data down to a smaller set of partitions first, but that comes with other downsides. Perhaps an MLLib expert could chime in on an alternate approach. My feeling (from a very quick look) is that there is room for some optimization in the internals Imran On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia mchett...@rocketfuelinc.com wrote: Hi, I am trying to validate our modeling data pipeline by running LogisticRegressionWithLBFGS on a dataset with ~3.7 million features, basically to compute AUC. This is on Spark 1.3.0. I am using 128 executors with 4 GB each + driver with 8 GB. The number of data partitions is 3072 The execution fails with the following messages: *Total size of serialized results of 54 tasks (10.4 GB) is bigger than spark.driver.maxResultSize (3.0 GB)* The associated stage in the job is treeAggregate at StandardScaler.scala:52 http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0 : The call stack looks as below: org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996) org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) I am trying to both understand why such large amount of data needs to be passed back to driver as well as figure out a way around this. I also want to understand how much memory is required, as a function of dataset size, feature set size, and number of iterations performed, for future experiments. From looking at the MLLib code, the largest data structure seems to be a dense vector of the same size as feature set. I am not familiar with algorithm or its implementation I would guess 3.7 million features would lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset size become so large? I looked into the treeAggregate and it looks like hierarchical aggregation. If the data being sent to the driver is basically the aggregated coefficients (i.e. dense vectors) for the final aggregation, can't the dense vectors from executors be pulled in one at a time and merged in memory, rather than pulling all of them in together? (This is totally uneducated guess so i may be completely off here). Is there a way to get this running? Thanks, pala
LogisticRegressionWithLBFGS with large feature set
Hi, I am trying to validate our modeling data pipeline by running LogisticRegressionWithLBFGS on a dataset with ~3.7 million features, basically to compute AUC. This is on Spark 1.3.0. I am using 128 executors with 4 GB each + driver with 8 GB. The number of data partitions is 3072 The execution fails with the following messages: *Total size of serialized results of 54 tasks (10.4 GB) is bigger than spark.driver.maxResultSize (3.0 GB)* The associated stage in the job is treeAggregate at StandardScaler.scala:52 http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0 : The call stack looks as below: org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996) org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233) org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190) I am trying to both understand why such large amount of data needs to be passed back to driver as well as figure out a way around this. I also want to understand how much memory is required, as a function of dataset size, feature set size, and number of iterations performed, for future experiments. From looking at the MLLib code, the largest data structure seems to be a dense vector of the same size as feature set. I am not familiar with algorithm or its implementation I would guess 3.7 million features would lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset size become so large? I looked into the treeAggregate and it looks like hierarchical aggregation. If the data being sent to the driver is basically the aggregated coefficients (i.e. dense vectors) for the final aggregation, can't the dense vectors from executors be pulled in one at a time and merged in memory, rather than pulling all of them in together? (This is totally uneducated guess so i may be completely off here). Is there a way to get this running? Thanks, pala