We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K items, with the total number of training records being 1.2 Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured {number of user data blocks = number of item data blocks}. The number of user/item blocks was varied between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there are atleast a couple of tasks that end up shuffle reading > 9.7G each in the aggregate stage (ALS.scala:337) and failing with the following exception:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745) at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108) at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124) at org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332) at org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204) As for the data, on the user side, the degree of a node in the connectivity graph is relatively small. However, on the item side, 3.8K out of the 4.5K items are connected to 10^5 users each on an average, with 100 items being connected to nearly 10^8 users. The rest of the items are connected to less than 10^5 users. With such a skew in the connectivity graph, I'm unsure if additional memory or variation in the block sizes would help (considering my limited understanding of the implementation in mllib). Any suggestion to address the problem? The test is being run on a standalone cluster of 3 hosts, each with 100G RAM & 24 cores dedicated to the application. The additional configs I made specific to the shuffle and task failure reduction are as follows: spark.core.connection.ack.wait.timeout=600 spark.shuffle.consolidateFiles=true spark.shuffle.manager=SORT The job execution summary is as follows: Active Stages: Stage id 2, aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200 (3 failed), Shuffle Read : 141.6 GB Completed Stages (5) Stage Id Description Duration Tasks: Succeeded/Total Input Shuffle Read Shuffle Write 6 org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min 1200/1200 29.9 GB 1668.4 MB 186.8 GB 5 mapPartitionsWithIndex at ALS.scala:250 +details 3 map at ALS.scala:231 0 aggregate at ALS.scala:337 +details 1 map at ALS.scala:228 +details Thanks, Bharath