Re: LogisticRegressionWithLBFGS with large feature set

2015-05-19 Thread Xiangrui Meng
For ML applications, the best setting to set the number of partitions
to match the number of cores to reduce shuffle size. You have 3072
partitions but 128 executors, which causes the overhead. For the
MultivariateOnlineSummarizer, we plan to add flags to specify what
need to be computed to reduce the overhead, in 1.5. -Xiangrui

On Mon, May 18, 2015 at 7:00 PM, Imran Rashid iras...@cloudera.com wrote:
 I'm not super familiar with this part of the code, but from taking a quick
 look:

 a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles
 per feature (mean, max, min, etc. etc.)
 b) The limit is on the result size from *all* tasks, not from one task.  You
 start with 3072 tasks
 c) tree aggregate should first merge things down to about 8 partitions
 before bringing results back to the driver, which is how you end up with 54
 tasks at your failure.

 this means you should have about 30 MB / per task per meaure * 54 tasks * 7
 measures, which comes to about 11GB, or in the ballpark of what you found.

 In principle, you could get this working by adding more levels to the
 treeAggregate (the depth parameter), but looks like that isn't exposed.  You
 could also try coalescing your data down to a smaller set of partitions
 first, but that comes with other downsides.

 Perhaps an MLLib expert could chime in on an alternate approach.  My feeling
 (from a very quick look) is that there is room for some optimization in the
 internals

 Imran

 On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia
 mchett...@rocketfuelinc.com wrote:

 Hi,

 I am trying to validate our modeling data pipeline by running
 LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
 basically to compute AUC. This is on Spark 1.3.0.

 I am using 128 executors with 4 GB each + driver with 8 GB. The number of
 data partitions is 3072

 The execution fails with the following messages:

 Total size of serialized results of 54 tasks (10.4 GB) is bigger than
 spark.driver.maxResultSize (3.0 GB)

 The associated stage in the job is treeAggregate at
 StandardScaler.scala:52 : The call stack looks as below:

 org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
 org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)

 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)

 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)


 I am trying to both understand why such large amount of data needs to be
 passed back to driver as well as figure out a way around this. I also want
 to understand how much memory is required, as a function of dataset size,
 feature set size, and number of iterations performed, for future
 experiments.

 From looking at the MLLib code, the largest data structure seems to be a
 dense vector of the same size as feature set. I am not familiar with
 algorithm or its implementation I would guess 3.7 million features would
 lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
 size become so large?

 I looked into the treeAggregate and it looks like hierarchical
 aggregation. If the data being sent to the driver is basically the
 aggregated coefficients (i.e. dense vectors) for the final aggregation,
 can't the dense vectors from executors be pulled in one at a time and merged
 in memory, rather than pulling all of them in together? (This is totally
 uneducated guess so i may be completely off here).

 Is there a way to get this running?

 Thanks,
 pala



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: LogisticRegressionWithLBFGS with large feature set

2015-05-18 Thread Imran Rashid
I'm not super familiar with this part of the code, but from taking a quick
look:

a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles
per feature (mean, max, min, etc. etc.)
b) The limit is on the result size from *all* tasks, not from one task.
You start with 3072 tasks
c) tree aggregate should first merge things down to about 8 partitions
before bringing results back to the driver, which is how you end up with 54
tasks at your failure.

this means you should have about 30 MB / per task per meaure * 54 tasks * 7
measures, which comes to about 11GB, or in the ballpark of what you found.

In principle, you could get this working by adding more levels to the
treeAggregate (the depth parameter), but looks like that isn't exposed.
You could also try coalescing your data down to a smaller set of partitions
first, but that comes with other downsides.

Perhaps an MLLib expert could chime in on an alternate approach.  My
feeling (from a very quick look) is that there is room for some
optimization in the internals

Imran

On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia mchett...@rocketfuelinc.com
 wrote:

 Hi,

 I am trying to validate our modeling data pipeline by running
 LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
 basically to compute AUC. This is on Spark 1.3.0.

 I am using 128 executors with 4 GB each + driver with 8 GB. The number of
 data partitions is 3072

 The execution fails with the following messages:

 *Total size of serialized results of 54 tasks (10.4 GB) is bigger than
 spark.driver.maxResultSize (3.0 GB)*

 The associated stage in the job is treeAggregate at
 StandardScaler.scala:52
 http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0
  :
 The call stack looks as below:

 org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
 org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)
 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)
 org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)


 I am trying to both understand why such large amount of data needs to be
 passed back to driver as well as figure out a way around this. I also want
 to understand how much memory is required, as a function of dataset size,
 feature set size, and number of iterations performed, for future
 experiments.

 From looking at the MLLib code, the largest data structure seems to be a
 dense vector of the same size as feature set. I am not familiar with
 algorithm or its implementation I would guess 3.7 million features would
 lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
 size become so large?

 I looked into the treeAggregate and it looks like hierarchical
 aggregation. If the data being sent to the driver is basically the
 aggregated coefficients (i.e. dense vectors) for the final aggregation,
 can't the dense vectors from executors be pulled in one at a time and
 merged in memory, rather than pulling all of them in together? (This is
 totally uneducated guess so i may be completely off here).

 Is there a way to get this running?

 Thanks,
 pala



LogisticRegressionWithLBFGS with large feature set

2015-05-14 Thread Pala M Muthaia
Hi,

I am trying to validate our modeling data pipeline by running
LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
basically to compute AUC. This is on Spark 1.3.0.

I am using 128 executors with 4 GB each + driver with 8 GB. The number of
data partitions is 3072

The execution fails with the following messages:

*Total size of serialized results of 54 tasks (10.4 GB) is bigger than
spark.driver.maxResultSize (3.0 GB)*

The associated stage in the job is treeAggregate at StandardScaler.scala:52
http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3attempt=0
:
The call stack looks as below:

org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)
org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)


I am trying to both understand why such large amount of data needs to be
passed back to driver as well as figure out a way around this. I also want
to understand how much memory is required, as a function of dataset size,
feature set size, and number of iterations performed, for future
experiments.

From looking at the MLLib code, the largest data structure seems to be a
dense vector of the same size as feature set. I am not familiar with
algorithm or its implementation I would guess 3.7 million features would
lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
size become so large?

I looked into the treeAggregate and it looks like hierarchical aggregation.
If the data being sent to the driver is basically the aggregated
coefficients (i.e. dense vectors) for the final aggregation, can't the
dense vectors from executors be pulled in one at a time and merged in
memory, rather than pulling all of them in together? (This is totally
uneducated guess so i may be completely off here).

Is there a way to get this running?

Thanks,
pala