For ML applications, the best setting to set the number of partitions
to match the number of cores to reduce shuffle size. You have 3072
partitions but 128 executors, which causes the overhead. For the
MultivariateOnlineSummarizer, we plan to add flags to specify what
need to be computed to reduce the overhead, in 1.5. -Xiangrui

On Mon, May 18, 2015 at 7:00 PM, Imran Rashid <iras...@cloudera.com> wrote:
> I'm not super familiar with this part of the code, but from taking a quick
> look:
>
> a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles
> per feature (mean, max, min, etc. etc.)
> b) The limit is on the result size from *all* tasks, not from one task.  You
> start with 3072 tasks
> c) tree aggregate should first merge things down to about 8 partitions
> before bringing results back to the driver, which is how you end up with 54
> tasks at your failure.
>
> this means you should have about 30 MB / per task per meaure * 54 tasks * 7
> measures, which comes to about 11GB, or in the ballpark of what you found.
>
> In principle, you could get this working by adding more levels to the
> treeAggregate (the depth parameter), but looks like that isn't exposed.  You
> could also try coalescing your data down to a smaller set of partitions
> first, but that comes with other downsides.
>
> Perhaps an MLLib expert could chime in on an alternate approach.  My feeling
> (from a very quick look) is that there is room for some optimization in the
> internals
>
> Imran
>
> On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia
> <mchett...@rocketfuelinc.com> wrote:
>>
>> Hi,
>>
>> I am trying to validate our modeling data pipeline by running
>> LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
>> basically to compute AUC. This is on Spark 1.3.0.
>>
>> I am using 128 executors with 4 GB each + driver with 8 GB. The number of
>> data partitions is 3072
>>
>> The execution fails with the following messages:
>>
>> Total size of serialized results of 54 tasks (10.4 GB) is bigger than
>> spark.driver.maxResultSize (3.0 GB)
>>
>> The associated stage in the job is treeAggregate at
>> StandardScaler.scala:52 : The call stack looks as below:
>>
>> org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
>> org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)
>>
>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)
>>
>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)
>>
>>
>> I am trying to both understand why such large amount of data needs to be
>> passed back to driver as well as figure out a way around this. I also want
>> to understand how much memory is required, as a function of dataset size,
>> feature set size, and number of iterations performed, for future
>> experiments.
>>
>> From looking at the MLLib code, the largest data structure seems to be a
>> dense vector of the same size as feature set. I am not familiar with
>> algorithm or its implementation I would guess 3.7 million features would
>> lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
>> size become so large?
>>
>> I looked into the treeAggregate and it looks like hierarchical
>> aggregation. If the data being sent to the driver is basically the
>> aggregated coefficients (i.e. dense vectors) for the final aggregation,
>> can't the dense vectors from executors be pulled in one at a time and merged
>> in memory, rather than pulling all of them in together? (This is totally
>> uneducated guess so i may be completely off here).
>>
>> Is there a way to get this running?
>>
>> Thanks,
>> pala
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to