I'm not super familiar with this part of the code, but from taking a quick
look:

a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles
per feature (mean, max, min, etc. etc.)
b) The limit is on the result size from *all* tasks, not from one task.
You start with 3072 tasks
c) tree aggregate should first merge things down to about 8 partitions
before bringing results back to the driver, which is how you end up with 54
tasks at your failure.

this means you should have about 30 MB / per task per meaure * 54 tasks * 7
measures, which comes to about 11GB, or in the ballpark of what you found.

In principle, you could get this working by adding more levels to the
treeAggregate (the depth parameter), but looks like that isn't exposed.
You could also try coalescing your data down to a smaller set of partitions
first, but that comes with other downsides.

Perhaps an MLLib expert could chime in on an alternate approach.  My
feeling (from a very quick look) is that there is room for some
optimization in the internals

Imran

On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia <mchett...@rocketfuelinc.com
> wrote:

> Hi,
>
> I am trying to validate our modeling data pipeline by running
> LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
> basically to compute AUC. This is on Spark 1.3.0.
>
> I am using 128 executors with 4 GB each + driver with 8 GB. The number of
> data partitions is 3072
>
> The execution fails with the following messages:
>
> *Total size of serialized results of 54 tasks (10.4 GB) is bigger than
> spark.driver.maxResultSize (3.0 GB)*
>
> The associated stage in the job is treeAggregate at
> StandardScaler.scala:52
> <http://lsv-10.rfiserve.net:18080/history/application_1426202183036_633264/stages/stage?id=3&attempt=0>
>  :
> The call stack looks as below:
>
> org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
> org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)
> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)
> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)
>
>
> I am trying to both understand why such large amount of data needs to be
> passed back to driver as well as figure out a way around this. I also want
> to understand how much memory is required, as a function of dataset size,
> feature set size, and number of iterations performed, for future
> experiments.
>
> From looking at the MLLib code, the largest data structure seems to be a
> dense vector of the same size as feature set. I am not familiar with
> algorithm or its implementation I would guess 3.7 million features would
> lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
> size become so large?
>
> I looked into the treeAggregate and it looks like hierarchical
> aggregation. If the data being sent to the driver is basically the
> aggregated coefficients (i.e. dense vectors) for the final aggregation,
> can't the dense vectors from executors be pulled in one at a time and
> merged in memory, rather than pulling all of them in together? (This is
> totally uneducated guess so i may be completely off here).
>
> Is there a way to get this running?
>
> Thanks,
> pala
>

Reply via email to