Any suggestions to address the described problem? In particular, it appears
that considering the skewed degree of some of the item nodes in the graph,
I believe it should be possible to define better block sizes to reflect
that fact, but am unsure of the way of arriving at the sizes accordingly.

Thanks,
Bharath

On Fri, Nov 28, 2014 at 12:00 AM, Bharath Ravi Kumar <reachb...@gmail.com>
wrote:

> We're training a recommender with ALS in mllib 1.1 against a dataset of
> 150M users and 4.5K items, with the total number of training records being
> 1.2 Billion (~30GB data). The input data is spread across 1200 partitions
> on HDFS. For the training, rank=10, and we've configured {number of user
> data blocks = number of item data blocks}. The number of user/item blocks
> was varied  between 50 to 1200. Irrespective of the block size (e.g. at
> 1200 blocks each), there are atleast a couple of tasks that end up shuffle
> reading > 9.7G each in the aggregate stage (ALS.scala:337) and failing with
> the following exception:
>
> java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
>         at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
>         at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
>         at
> org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
>         at
> org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
>         at
> org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)
>
>
>
>
> As for the data, on the user side, the degree of a node in the
> connectivity graph is relatively small. However, on the item side, 3.8K out
> of the 4.5K items are connected to 10^5 users each on an average, with 100
> items being connected to nearly 10^8 users. The rest of the items are
> connected to less than 10^5 users. With such a skew in the connectivity
> graph, I'm unsure if additional memory or variation in the block sizes
> would help (considering my limited understanding of the implementation in
> mllib). Any suggestion to address the problem?
>
>
> The test is being run on a standalone cluster of 3 hosts, each with 100G
> RAM & 24 cores dedicated to the application. The additional configs I made
> specific to the shuffle and task failure reduction are as follows:
>
> spark.core.connection.ack.wait.timeout=600
> spark.shuffle.consolidateFiles=true
> spark.shuffle.manager=SORT
>
>
> The job execution summary is as follows:
>
> Active Stages:
>
> Stage id 2,  aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200
> (3 failed), Shuffle Read :  141.6 GB
>
> Completed Stages (5)
> Stage Id    Description                                        Duration
> Tasks: Succeeded/Total    Input    Shuffle Read    Shuffle Write
> 6            org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min
>      1200/1200                29.9 GB    1668.4 MB        186.8 GB
>
> 5    mapPartitionsWithIndex at ALS.scala:250 +details
>
> 3    map at ALS.scala:231
>
> 0    aggregate at ALS.scala:337 +details
>
> 1    map at ALS.scala:228 +details
>
>
> Thanks,
> Bharath
>

Reply via email to