Hello everyone. I'm running the Apache Spark MLlib ALS matrix factorization and I ran into the following exceptions:
*The following exception is periodic, it starts on the first iteration with the OOM error and then a long line of FNF exceptions during stage resubmittings (according with the UI, **stage 12.0 is the first Iteration stage* *). After 4 retries, the Job indeed failes and get aborted* 16/08/31 23:53:03 WARN TaskSetManager: Lost task 12.0 in stage 2.0 (TID 3312, cloud-15): java.lang.OutOfMemoryError: GC overhead limit exceeded at java.lang.Integer.valueOf(Integer.java:832) at scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:70) at scala.runtime.ScalaRunTime$.array_apply(ScalaRunTime.scala:73) at org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.copyElement(ALS.scala:1059) at org.apache.spark.ml.recommendation.ALS$UncompressedInBlockSort.copyElement(ALS.scala:1000) at org.apache.spark.util.collection.TimSort$SortState.mergeLo(TimSort.java:735) at org.apache.spark.util.collection.TimSort$SortState.mergeAt(TimSort.java:525) at org.apache.spark.util.collection.TimSort$SortState.mergeCollapse(TimSort.java:453) at org.apache.spark.util.collection.TimSort$SortState.access$200(TimSort.java:325) at org.apache.spark.util.collection.TimSort.sort(TimSort.java:153) at org.apache.spark.util.collection.Sorter.sort(Sorter.scala:37) at org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.org$apache$spark$ml$recommendation$ALS$UncompressedInBlock$$sort(ALS.scala:971) at org.apache.spark.ml.recommendation.ALS$UncompressedInBlock.compress(ALS.scala:929) at org.apache.spark.ml.recommendation.ALS$$anonfun$15.apply(ALS.scala:1114) at org.apache.spark.ml.recommendation.ALS$$anonfun$15.apply(ALS.scala:1108) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$43$$anonfun$apply$44.apply(PairRDDFunctions.scala:755) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:285) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) org.apache.spark.shuffle.FetchFailedException: Error in opening FileSegmentManagedBuffer{file=/data/1/peel/spark/tmp/spark-3009db19-3f4a-43ae-825a-e241b533aaf9/executor-5fb32216-a9dd-4e62-b6aa-1e2d0a5910b1/b lockmgr-3c845e0e-b832-42c5-8cf3-555d0c3542c0/02/shuffle_0_488_0.data, offset=5194506, length=48945} at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:307) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:152) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:69) at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Error in opening FileSegmentManagedBuffer{file=/data/1/peel/spark/tmp/spark-3009db19-3f4a-43ae-825a-e241b533aaf9/executor-5fb32216-a9dd-4e62-b6aa-1e2d0a5910b1/blockmgr-3c845e0e-b832-42c5-8cf3-555d0c3542c0/02/shuffle_0_488_0.data, offset=5194506, length=48945} at org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:113) at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:304) ... 26 more Caused by: java.io.FileNotFoundException: /data/1/peel/spark/tmp/spark-3009db19-3f4a-43ae-825a-e241b533aaf9/executor-5fb32216-a9dd-4e62-b6aa-1e2d0a5910b1/blockmgr-3c845e0e-b832-42c5-8cf3-555d0c3542c0/02/shuffle_0_488_0.data (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.spark.network.buffer.FileSegmentManagedBuffer.createInputStream(FileSegmentManagedBuffer.java:98) ... 27 more 16/09/01 00:54:51 WARN TaskSetManager: Lost task 304.0 in stage 12.0 (TID 9286, cloud-22): java.lang.OutOfMemoryError: Java heap space at java.io.ObjectOutputStream$HandleTable.growEntries(ObjectOutputStream.java:2347) at java.io.ObjectOutputStream$HandleTable.assign(ObjectOutputStream.java:2276) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1323) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44) at org.apache.spark.serializer.SerializationStream.writeValue(Serializer.scala:147) at org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:185) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:206) at org.apache.spark.util.collection.ExternalAppendOnlyMap.spill(ExternalAppendOnlyMap.scala:55) at org.apache.spark.util.collection.Spillable$class.maybeSpill(Spillable.scala:93) at org.apache.spark.util.collection.ExternalAppendOnlyMap.maybeSpill(ExternalAppendOnlyMap.scala:55) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:158) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:45) at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:89) at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:140) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:136) 16/09/01 00:55:20 INFO TaskSetManager: Starting task 375.0 in stage 12.0 (TID 9360, cloud-15, partition 375,PROCESS_LOCAL, 2268 bytes) 16/09/01 00:55:20 INFO TaskSetManager: Finished task 91.0 in stage 12.0 (TID 9093) in 1356978 ms on cloud-15 (120/720) 16/09/01 00:56:07 INFO TaskSetManager: Starting task 384.0 in stage 12.0 (TID 9361, cloud-24, partition 384,PROCESS_LOCAL, 2268 bytes) 16/09/01 00:56:07 WARN TaskSetManager: Lost task 18.0 in stage 12.0 (TID 9027, cloud-24): FetchFailed(BlockManagerId(2, cloud-22, 40528), shuffleId=22, mapId=671, reduceId=18, message= org.apache.spark.shuffle.FetchFailedException: java.io.FileNotFoundException: /data/1/peel/spark/tmp/spark-6225354a-22f0-45dd-aff0-76051ad609ed/executor-d5fbc621-341c-4fc9-bedc-c292dc7f038a/blockmgr-c8b40f38-99a9-4060-823d-50b502bd9f91/25/shuffle_22_671_0.index (No such file or directory) at java.io.FileInputStream.open0(Native Method) at java.io.FileInputStream.open(FileInputStream.java:195) at java.io.FileInputStream.<init>(FileInputStream.java:138) at org.apache.spark.shuffle.IndexShuffleBlockResolver.getBlockData(IndexShuffleBlockResolver.scala:191) at org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:298) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58) at org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:58) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) I'm running with* spark-1.6.2*. I really can't figure out the reason behind that. My code simply calls the library as follows: val als = new ALS() .setIntermediateRDDStorageLevel(storageLevel) .setBlocks(numTasks) .setLambda(0.1) .setRank(50) .setIterations(10) .setSeed(42) val model = als.run(ratings) model.save(sc, outputPath) sc.stop() where - *ratings* as the input RDD (parallelized with *numTasks* partitions) contains (uid, iid, rate) rows about 8e6 users, 1e6 items and about (5,6)e9 ratings (700/user avg) - *numTasks*: currently is 240 * 3 (= numOfCores * 3) - *storageLevel*: MEMORY_AND_DISK I did several tries as follows: - get lower the number of blocks: 1 - numTasks, 2 - 240(numOfCores), 3 - let it setted by the MLlib implementation - change the storage level to MEMORY_ONLY I'd try to varying the spark.shuffle.memoryFraction as well, but I read is deprecated since 1.6 spark version. I'm running with a 15 nodes cluster - 16cpus per node, 32GB memory per node - with the following valuable properties: spark.executor.memory = 28672m spark.driver.memory = 28672m spark.deamon.memory = 28672m spark.driver.maxResultSize = 0 spark.network.timeout = 3000s Any help will be appreciated. Thank you. -- *Andrea Spina* N.Tessera: *74598* MAT: *89369* *Ingegneria Informatica* *[LM] *(D.M. 270)