However, Executors were dying when using Netty as well, so it is possible
that the OOM was occurring then too. It is also possible only one of your
Executors OOMs (due to a particularly large task) and the others display
various exceptions while trying to fetch the shuffle blocks from the failed
executor.

I cannot explain the local FileNotFoundExcepions occurring on machines that
were not throwing fatal errors, though -- typically I have only seen that
happen when a fatal error (e.g., OOM) was thrown on an Executor, causing it
to begin the termination process which involves deleting its own shuffle
files. It may then throw the FNF if other Executors request those files
before it has completed its shutdown (and will throw a ConnectionFailed
once it's completed terminating).

On Thu, Mar 5, 2015 at 12:19 AM, Shao, Saisai <saisai.s...@intel.com> wrote:

>  I’ve no idea why Netty didn’t meet OOM issue, one possibility is that
> Netty uses direct memory to save each block, whereas NIO uses on-heap
> memory, so Netty occupies less on heap memory than NIO.
>
>
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 4:14 PM
>
> *To:* Shao, Saisai
> *Cc:* Cheng, Hao; user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> Thanks. I was about to submit a ticket for this :)
>
>
>
> Also there's a ticket for sort-merge based groupbykey
> https://issues.apache.org/jira/browse/SPARK-3461
>
>
>
> BTW, any idea why run with netty didn't output OOM error messages? It's
> very confusing in troubleshooting.
>
>
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai <saisai.s...@intel.com>
> wrote:
>
>  I think there’s a lot of JIRA trying to solve this problem (
> https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge
> join is a good choice.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 3:55 PM
> *To:* Shao, Saisai
> *Cc:* Cheng, Hao; user
>
>
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> There're some skew.
>
>
>
> 64
>
> 6164
>
> 0
>
> SUCCESS
>
> PROCESS_LOCAL
>
> 200 / xxxx
>
> 2015/03/04 23:45:47
>
> 1.1 min
>
> 6 s
>
> 198.6 MB
>
> 21.1 GB
>
> 240.8 MB
>
> 59
>
> 6159
>
> 0
>
> SUCCESS
>
> PROCESS_LOCAL
>
> 30 / xxxx
>
> 2015/03/04 23:45:47
>
> 44 s
>
> 5 s
>
> 200.7 MB
>
> 4.8 GB
>
> 154.0 MB
>
> But I expect this kind of skewness to be quite common.
>
>
>
> Jianshi
>
>
>
>
>
> On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  I see. I'm using core's join. The data might have some skewness
> (checking).
>
>
>
> I understand shuffle can spill data to disk but when consuming it, say in
> cogroup or groupByKey, it still needs to read the whole group elements,
> right? I guess OOM happened there when reading very large groups.
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai <saisai.s...@intel.com>
> wrote:
>
>  I think what you could do is to monitor through web UI to see if there’s
> any skew or other symptoms in shuffle write and read. For GC you could use
> the below configuration as you mentioned.
>
>
>
> From Spark core side, all the shuffle related operations can spill the
> data into disk and no need to read the whole partition into memory. But if
> you uses SparkSQL, it depends on how SparkSQL uses this operators.
>
>
>
> CC @hao if he has some thoughts on it.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 3:28 PM
> *To:* Shao, Saisai
>
>
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> Hi Saisai,
>
>
>
> What's your suggested settings on monitoring shuffle? I've
> enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.
>
>
>
> I found SPARK-3461 (Support external groupByKey using
> repartitionAndSortWithinPartitions) want to make groupByKey using external
> storage. It's still open status. Does that mean now
> groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
> the group as a whole during consuming?
>
>
>
> How can I deal with the key skewness in joins? Is there a skew-join
> implementation?
>
>
>
>
>
> Jianshi
>
>
>
>
>
>
>
> On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai <saisai.s...@intel.com>
> wrote:
>
>  Hi Jianshi,
>
>
>
> From my understanding, it may not be the problem of NIO or Netty, looking
> at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
> theoretically EAOM can spill the data into disk if memory is not enough,
> but there might some issues when join key is skewed or key number is
> smaller, so you will meet OOM.
>
>
>
> Maybe you could monitor each stage or task’s shuffle and GC status also
> system status to identify the problem.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 2:32 PM
> *To:* Aaron Davidson
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> One really interesting is that when I'm using the
> netty-based spark.shuffle.blockTransferService, there's no OOM error
> messages (java.lang.OutOfMemoryError: Java heap space).
>
>
>
> Any idea why it's not here?
>
>
>
> I'm using Spark 1.2.1.
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  I changed spark.shuffle.blockTransferService to nio and now I'm getting
> OOM errors, I'm doing a big join operation.
>
>
>
>
>
> 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
> (TID 6207)
>
> java.lang.OutOfMemoryError: Java heap space
>
>         at
> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
>
>         at
> org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
>
>         at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
>
>         at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
>
>         at
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
>
>         at
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>
>         at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
>
>
> Is join/cogroup still memory bound?
>
>
>
>
>
> Jianshi
>
>
>
>
>
>
>
> On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  Hmm... ok, previous errors are still block fetch errors.
>
>
>
> 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 11 outstanding blocks
>
> java.io.IOException: Failed to connect to host-xxxx/xxxx:55597
>
>         at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>
>         at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>
>         at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>
>         at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>
>         at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>
>         at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
>
>         at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
>
>         at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
>
>         at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
>
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
>         at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>
>         at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
>         at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>         at java.lang.Thread.run(Thread.java:724)
>
> Caused by: java.net.ConnectException: Connection refused:
> lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597
>
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>
>         at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
>
>         at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>
>         at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>
>
>
> And I checked executor on container host-xxxx, everything is good.
>
>
>
> Jianshi
>
>
>
>
>
> On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson <ilike...@gmail.com>
> wrote:
>
>  Drat! That doesn't help. Could you scan from the top to see if there
> were any fatal errors preceding these? Sometimes a OOM will cause this type
> of issue further down.
>
>
>
> On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  The failed executor has the following error messages. Any hints?
>
>
>
> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
> RpcHandler#receive() on RPC id 5711039715419258699
>
> java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>         at java.io.FileInputStream.open(Native Method)
>
>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>
>         at
> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>
>         at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>
>         at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>
>         at java.lang.Thread.run(Thread.java:724)
>
> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
> RpcHandler#receive() on RPC id 7941985280808455530
>
> java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>         at java.io.FileInputStream.open(Native Method)
>
>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>
>         at
> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>
>         at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>
>         at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>
>         at java.lang.Thread.run(Thread.java:724)
>
> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
> RpcHandler#receive() on RPC id 5413737659722448543
>
> java.io.FileNotFoundException:
> /hadoop03/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-88ee/17/shuffle_0_1074_0.index
> (No such file or directory)
>
>         at java.io.FileInputStream.open(Native Method)
>
>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>
>         at
> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>
>         at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>
>         at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>
>         at java.lang.Thread.run(Thread.java:724)
>
>
>
>
>
> Jianshi
>
>
>
> On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson <ilike...@gmail.com> wrote:
>
>  "Failed to connect" implies that the executor at that host died, please
> check its logs as well.
>
>
>
> On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  Sorry that I forgot the subject.
>
>
>
> And in the driver, I got many FetchFailedException. The error messages are
>
>
>
> 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID
> 7943, xxxx): FetchFailed(BlockManagerId(86, xxxx, 43070), shuffleId=0,
> mapId=24, reduceId=1220, message=
>
> org.apache.spark.shuffle.FetchFailedException: Failed to connect to
> xxxx/xxxx:43070
>
>         at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>
>         at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>
>         at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>
>
>
>
>
> Jianshi
>
>
>
> On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  Hi,
>
>
>
> I got this error message:
>
>
>
> 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>         at java.io.FileInputStream.open(Native Method)
>
>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>
>         at
> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>
>         at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>
>
>
>
>
> And then for the same index file and executor, I got the following errors
> multiple times
>
>
>
> 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
> block(s) from host-xxxx:39534
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>
>
> 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
> shuffle_0_13_1228, and will not retry (0 retries)
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>
>
> ...
>
> Caused by: java.net.ConnectException: Connection refused: host-xxxx....
>
>
>
>
>
> What's the problem?
>
>
>
> BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
> bug fixes related to shuffle block fetching or index files after that?
>
>
>
>
>  Thanks,
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>

Reply via email to