[ https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376430#comment-14376430 ]
Imran Rashid commented on SPARK-5928: ------------------------------------- sorry to hear that [~douglaz]. To help understand / prioritize this, can you share a bit more info? a) how much data were you shuffling? b) were you able to fix this by increasing the number of partitions? how many partitions did you need to use in the end? c) did you get a mix of snappy errors as well? d) did you also run into SPARK-5945 as a result of your failures ? thanks > Remote Shuffle Blocks cannot be more than 2 GB > ---------------------------------------------- > > Key: SPARK-5928 > URL: https://issues.apache.org/jira/browse/SPARK-5928 > Project: Spark > Issue Type: Bug > Components: Spark Core > Reporter: Imran Rashid > > If a shuffle block is over 2GB, the shuffle fails, with an uninformative > exception. The tasks get retried a few times and then eventually the job > fails. > Here is an example program which can cause the exception: > {code} > val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore => > val n = 3e3.toInt > val arr = new Array[Byte](n) > //need to make sure the array doesn't compress to something small > scala.util.Random.nextBytes(arr) > arr > } > rdd.map { x => (1, x)}.groupByKey().count() > {code} > Note that you can't trigger this exception in local mode, it only happens on > remote fetches. I triggered these exceptions running with > {{MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m}} > {noformat} > 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, > imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1, > imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message= > org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds > 2147483647: 3021252889 - discarded > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) > at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame > length exceeds 2147483647: 3021252889 - discarded > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501) > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477) > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403) > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343) > at > io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) > ... 1 more > ) > {noformat} > or if you use "spark.shuffle.blockTransferService=nio", then you get: > {noformat} > 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, > imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2, > imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message= > org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed > with ACK that signalled a remote error: java.lang.IllegalArgumentException: > Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76) > at > org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750) > at > org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) > at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: sendMessageReliably failed with ACK that > signalled a remote error: java.lang.IllegalArgumentException: Size exceeds > Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76) > at > org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750) > at > org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > at > org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954) > at > org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940) > at > org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67) > at > org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728) > at > org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) > ... 3 more > ) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org