Thanks Deng, Yes, I agree that there is a partition larger than 2GB which caused this exception.
But actually in my case it seems to be not-helpful to fix this problem by directly increasing partitioning in sortBy operation. I think the partitioning in sortBy is not balanced, e.g. in my dataset, there exist amounts of elements having same value, then they will be kept in a common partition whose size would be quite large. So, I called rePartition method for balanced partitioning after sorting and it works well. On 30 October 2015 at 03:13, Deng Ching-Mallete <och...@apache.org> wrote: > Hi Yifan, > > This is a known issue, please refer to > https://issues.apache.org/jira/browse/SPARK-6235 for more details. In > your case, it looks like you are caching to disk a partition > 2G. A > workaround would be to increase the number of your RDD partitions in order > to make them smaller in size. > > HTH, > Deng > > On Thu, Oct 29, 2015 at 8:40 PM, Yifan LI <iamyifa...@gmail.com> wrote: > >> I have a guess that before scanning that RDD, I sorted it and set >> partitioning, so the result is not balanced: >> >> sortBy[S](f: Function >> <http://spark.apache.org/docs/latest/api/scala/org/apache/spark/api/java/function/Function.html> >> [T, S], ascending: Boolean, *numPartitions*: Int) >> >> I will try to repartition it to see if it helps. >> >> Best, >> Yifan LI >> >> >> >> >> >> On 29 Oct 2015, at 12:52, Yifan LI <iamyifa...@gmail.com> wrote: >> >> Hey, >> >> I was just trying to scan a large RDD sortedRdd, ~1billion elements, >> using toLocalIterator api, but an exception returned as it was almost >> finished: >> >> java.lang.RuntimeException: java.lang.IllegalArgumentException: Size >> exceeds Integer.MAX_VALUE >> at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:821) >> at >> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:125) >> at >> org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:113) >> at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1285) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:127) >> at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:134) >> at >> org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:511) >> at >> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:302) >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> at >> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) >> at >> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) >> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) >> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) >> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) >> at >> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57) >> at >> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:114) >> at >> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:87) >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:101) >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) >> at >> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >> at >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> at >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) >> at java.lang.Thread.run(Thread.java:745) >> >> at >> org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:162) >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:103) >> at >> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:51) >> at >> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:254) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) >> at >> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) >> at >> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) >> at >> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) >> at >> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> at >> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> at >> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) >> at java.lang.Thread.run(Thread.java:745) >> >> Driver stacktrace: >> at org.apache.spark.scheduler.DAGScheduler.org >> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263) >> at >> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >> at >> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >> at >> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730) >> at scala.Option.foreach(Option.scala:236) >> at >> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457) >> at >> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418) >> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) >> >> Do you have any idea? >> >> I have set partitioning quite big, like 40000 >> >> >> Best, >> Yifan LI >> >> >> >