[ https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15182472#comment-15182472 ]
Taro L. Saito edited comment on SPARK-5928 at 3/7/16 2:24 AM: -------------------------------------------------------------- FYI. I created LArray library that can handle data larger than 2GB, which is the limit of Java byte arrays and mmap files: https://github.com/xerial/larray It looks like there are several reports showing when this 2GB limit can be problematic (especially in processing Spark SQL): http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska/29 Let me know if there is anything I can work on. was (Author: taroleo): FYI. I created LArray library that can handle data larger than 2GB, which is the limit of Java byte arrays and mmap files: https://github.com/xerial/larray It looks like there are several reports when this 2GB limit can be problematic (especially in processing Spark SQL): http://www.slideshare.net/SparkSummit/top-5-mistakes-when-writing-spark-applications-by-mark-grover-and-ted-malaska/29 Let me know if there is anything that I can work on. > Remote Shuffle Blocks cannot be more than 2 GB > ---------------------------------------------- > > Key: SPARK-5928 > URL: https://issues.apache.org/jira/browse/SPARK-5928 > Project: Spark > Issue Type: Bug > Components: Spark Core > Reporter: Imran Rashid > > If a shuffle block is over 2GB, the shuffle fails, with an uninformative > exception. The tasks get retried a few times and then eventually the job > fails. > Here is an example program which can cause the exception: > {code} > val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore => > val n = 3e3.toInt > val arr = new Array[Byte](n) > //need to make sure the array doesn't compress to something small > scala.util.Random.nextBytes(arr) > arr > } > rdd.map { x => (1, x)}.groupByKey().count() > {code} > Note that you can't trigger this exception in local mode, it only happens on > remote fetches. I triggered these exceptions running with > {{MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m}} > {noformat} > 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, > imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1, > imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message= > org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds > 2147483647: 3021252889 - discarded > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) > at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame > length exceeds 2147483647: 3021252889 - discarded > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501) > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477) > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403) > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343) > at > io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) > ... 1 more > ) > {noformat} > or if you use "spark.shuffle.blockTransferService=nio", then you get: > {noformat} > 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, > imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2, > imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message= > org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed > with ACK that signalled a remote error: java.lang.IllegalArgumentException: > Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76) > at > org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750) > at > org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) > at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: sendMessageReliably failed with ACK that > signalled a remote error: java.lang.IllegalArgumentException: Size exceeds > Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76) > at > org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750) > at > org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > at > org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954) > at > org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940) > at > org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67) > at > org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728) > at > org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) > ... 3 more > ) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org