We're training a recommender with ALS in mllib 1.1 against a dataset of
150M users and 4.5K items, with the total number of training records being
1.2 Billion (~30GB data). The input data is spread across 1200 partitions
on HDFS. For the training, rank=10, and we've configured {number of user
data blocks = number of item data blocks}. The number of user/item blocks
was varied  between 50 to 1200. Irrespective of the block size (e.g. at
1200 blocks each), there are atleast a couple of tasks that end up shuffle
reading > 9.7G each in the aggregate stage (ALS.scala:337) and failing with
the following exception:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
        at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
        at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
        at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
        at
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
        at
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)




As for the data, on the user side, the degree of a node in the connectivity
graph is relatively small. However, on the item side, 3.8K out of the 4.5K
items are connected to 10^5 users each on an average, with 100 items being
connected to nearly 10^8 users. The rest of the items are connected to less
than 10^5 users. With such a skew in the connectivity graph, I'm unsure if
additional memory or variation in the block sizes would help (considering
my limited understanding of the implementation in mllib). Any suggestion to
address the problem?


The test is being run on a standalone cluster of 3 hosts, each with 100G
RAM & 24 cores dedicated to the application. The additional configs I made
specific to the shuffle and task failure reduction are as follows:

spark.core.connection.ack.wait.timeout=600
spark.shuffle.consolidateFiles=true
spark.shuffle.manager=SORT


The job execution summary is as follows:

Active Stages:

Stage id 2,  aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200
(3 failed), Shuffle Read :  141.6 GB

Completed Stages (5)
Stage Id    Description                                        Duration
Tasks: Succeeded/Total    Input    Shuffle Read    Shuffle Write
6            org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min
1200/1200                29.9 GB    1668.4 MB        186.8 GB

5    mapPartitionsWithIndex at ALS.scala:250 +details

3    map at ALS.scala:231

0    aggregate at ALS.scala:337 +details

1    map at ALS.scala:228 +details


Thanks,
Bharath

Reply via email to