[ 
https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14908705#comment-14908705
 ] 

Imran Rashid commented on SPARK-5928:
-------------------------------------

[~ariskk] The workaround is to increase the number of partitions.  All of the 
operations which trigger a shuffle take an optional second argument with the 
number of partitions, eg., {{reduceByKey( reduceFunc, numPartitions)}}.  In 
general, its best to err on the side of too many partitions, rather than too 
few.  My rule of thumb is to try to size partitions to to have roughly 100 MB 
of data (I have heard others throw around numbers in roughly the same 
ballpark).  Note that means you use a lot of partitions if you have say 1 TB of 
data you are shuffling.

Its worth noting that if you have very skewed data, just increasing the number 
of partitions in the function that triggers the shuffle might not help.  That 
controls the number of partitions on the shuffle-read (aka reduce) side, but 
not the shuffle-write (aka map) side.  If one map task writes out 2GB of data 
for one key, increasing the number of reduce partitions won't help you, since 
no matter how many reduce partitions, you will still write 2GB into one shuffle 
block.  (A shuffle block corresponds to one map task / reduce task pair.)  In 
that case, you may want to increase the number of partitions for your *map* 
stage, so that it is writing less data to one particular key.  You control the 
number of partitions for the map-stage either at the previous operation that 
triggered a shuffle (eg., a preceding {{reduceByKey}}), or the operation that 
loaded the data (eg, {{sc.textFile}}).  Eg:

{noformat}
val rawData = sc.textFile(..., numPartitionsFirstStage) // control the "map" 
partitions here
val afterShuffle = rawData.map{...}.reduceByKey( ..., numPartitionsSecondStage) 
// control the "reduce" partitions here
{noformat}

My general recommendation, if you want to re-use your code, and have it work on 
a data sets of varying sizes, is to make the number of partitions at *every* 
stage some easily controllable parameter (eg., via the command line), so you 
can tweak things without having to recompile your code.

> Remote Shuffle Blocks cannot be more than 2 GB
> ----------------------------------------------
>
>                 Key: SPARK-5928
>                 URL: https://issues.apache.org/jira/browse/SPARK-5928
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Imran Rashid
>
> If a shuffle block is over 2GB, the shuffle fails, with an uninformative 
> exception.  The tasks get retried a few times and then eventually the job 
> fails.
> Here is an example program which can cause the exception:
> {code}
>     val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>       val n = 3e3.toInt
>       val arr = new Array[Byte](n)
>       //need to make sure the array doesn't compress to something small
>       scala.util.Random.nextBytes(arr)
>       arr
>     }
>     rdd.map { x => (1, x)}.groupByKey().count()
> {code}
> Note that you can't trigger this exception in local mode, it only happens on 
> remote fetches.   I triggered these exceptions running with 
> {{MASTER=yarn-client spark-shell --num-executors 2 --executor-memory 4000m}}
> {noformat}
> 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, 
> imran-3.ent.cloudera.com): FetchFailed(BlockManagerId(1, 
> imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 
> 2147483647: 3021252889 - discarded
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>       at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
>       at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame 
> length exceeds 2147483647: 3021252889 - discarded
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403)
>       at 
> io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
>       at 
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>       at 
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>       at 
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>       at 
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>       at 
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>       at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>       at 
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>       ... 1 more
> )
> {noformat}
> or if you use "spark.shuffle.blockTransferService=nio", then you get:
> {noformat}
> 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, 
> imran-2.ent.cloudera.com): FetchFailed(BlockManagerId(2, 
> imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0, message=
> org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed 
> with ACK that signalled a remote error: java.lang.IllegalArgumentException: 
> Size exceeds Integer.MAX_VALUE
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>       at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at 
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>       at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>       at 
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>       at 
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>       at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
>       at 
> org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
>       at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: sendMessageReliably failed with ACK that 
> signalled a remote error: java.lang.IllegalArgumentException: Size exceeds 
> Integer.MAX_VALUE
>       at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
>       at 
> org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>       at 
> org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>       at java.lang.Thread.run(Thread.java:745)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940)
>       at 
> org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67)
>       at 
> org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728)
>       at 
> org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
>       ... 3 more
> )
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to