[ https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14908705#comment-14908705 ]
Imran Rashid commented on SPARK-5928: ------------------------------------- [~ariskk] The workaround is to increase the number of partitions. All of the operations which trigger a shuffle take an optional second argument with the number of partitions, eg., {{reduceByKey( reduceFunc, numPartitions)}}. In general, its best to err on the side of too many partitions, rather than too few. My rule of thumb is to try to size partitions to to have roughly 100 MB of data (I have heard others throw around numbers in roughly the same ballpark). Note that means you use a lot of partitions if you have say 1 TB of data you are shuffling. Its worth noting that if you have very skewed data, just increasing the number of partitions in the function that triggers the shuffle might not help. That controls the number of partitions on the shuffle-read (aka reduce) side, but not the shuffle-write (aka map) side. If one map task writes out 2GB of data for one key, increasing the number of reduce partitions won't help you, since no matter how many reduce partitions, you will still write 2GB into one shuffle block. (A shuffle block corresponds to one map task / reduce task pair.) In that case, you may want to increase the number of partitions for your *map* stage, so that it is writing less data to one particular key. You control the number of partitions for the map-stage either at the previous operation that triggered a shuffle (eg., a preceding {{reduceByKey}}), or the operation that loaded the data (eg, {{sc.textFile}}). Eg: {noformat} val rawData = sc.textFile(..., numPartitionsFirstStage) // control the "map" partitions here val afterShuffle = rawData.map{...}.reduceByKey( ..., numPartitionsSecondStage) // control the "reduce" partitions here {noformat} My general recommendation, if you want to re-use your code, and have it work on a data sets of varying sizes, is to make the number of partitions at *every* stage some easily controllable parameter (eg., via the command line), so you can tweak things without having to recompile your code. > Remote Shuffle Blocks cannot be more than 2 GB > ---------------------------------------------- > > Key: SPARK-5928 > URL: https://issues.apache.org/jira/browse/SPARK-5928 > Project: Spark > Issue Type: Bug > Components: Spark Core > Reporter: Imran Rashid > > If a shuffle block is over 2GB, the shuffle fails, with an uninformative > exception. The tasks get retried a few times and then eventually the job > fails. > Here is an example program which can cause the exception: > {code} > val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore => > val n = 3e3.toInt > val arr = new Array[Byte](n) > //need to make sure the array doesn't compress to something small > scala.util.Random.nextBytes(arr) > arr > } > rdd.map { x => (1, x)}.groupByKey().count() > {code} > Note that you can't trigger this exception in local mode, it only happens on > remote fetches. I triggered these exceptions running with > {{MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m}} > {noformat} > 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, > imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1, > imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message= > org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds > 2147483647: 3021252889 - discarded > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) > at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame > length exceeds 2147483647: 3021252889 - discarded > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501) > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477) > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403) > at > io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343) > at > io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249) > at > io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) > at > io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) > at > io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) > at > io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) > at > io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) > ... 1 more > ) > {noformat} > or if you use "spark.shuffle.blockTransferService=nio", then you get: > {noformat} > 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, > imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2, > imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message= > org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed > with ACK that signalled a remote error: java.lang.IllegalArgumentException: > Size exceeds Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76) > at > org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750) > at > org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at > org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) > at > org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) > at > org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125) > at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58) > at > org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46) > at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:230) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.IOException: sendMessageReliably failed with ACK that > signalled a remote error: java.lang.IllegalArgumentException: Size exceeds > Integer.MAX_VALUE > at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828) > at > org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76) > at > org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at > org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at > org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28) > at > org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70) > at > org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750) > at > org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:745) > at > org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954) > at > org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940) > at > org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67) > at > org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728) > at > org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581) > ... 3 more > ) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org