Thanks. I was about to submit a ticket for this :)

Also there's a ticket for sort-merge based groupbykey
https://issues.apache.org/jira/browse/SPARK-3461

BTW, any idea why run with netty didn't output OOM error messages? It's
very confusing in troubleshooting.


Jianshi

On Thu, Mar 5, 2015 at 4:01 PM, Shao, Saisai <saisai.s...@intel.com> wrote:

>  I think there’s a lot of JIRA trying to solve this problem (
> https://issues.apache.org/jira/browse/SPARK-5763). Basically sort merge
> join is a good choice.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 3:55 PM
> *To:* Shao, Saisai
> *Cc:* Cheng, Hao; user
>
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> There're some skew.
>
>
>
> 64
>
> 6164
>
> 0
>
> SUCCESS
>
> PROCESS_LOCAL
>
> 200 / xxxx
>
> 2015/03/04 23:45:47
>
> 1.1 min
>
> 6 s
>
> 198.6 MB
>
> 21.1 GB
>
> 240.8 MB
>
> 59
>
> 6159
>
> 0
>
> SUCCESS
>
> PROCESS_LOCAL
>
> 30 / xxxx
>
> 2015/03/04 23:45:47
>
> 44 s
>
> 5 s
>
> 200.7 MB
>
> 4.8 GB
>
> 154.0 MB
>
> But I expect this kind of skewness to be quite common.
>
>
>
> Jianshi
>
>
>
>
>
> On Thu, Mar 5, 2015 at 3:48 PM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  I see. I'm using core's join. The data might have some skewness
> (checking).
>
>
>
> I understand shuffle can spill data to disk but when consuming it, say in
> cogroup or groupByKey, it still needs to read the whole group elements,
> right? I guess OOM happened there when reading very large groups.
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 3:38 PM, Shao, Saisai <saisai.s...@intel.com>
> wrote:
>
>  I think what you could do is to monitor through web UI to see if there’s
> any skew or other symptoms in shuffle write and read. For GC you could use
> the below configuration as you mentioned.
>
>
>
> From Spark core side, all the shuffle related operations can spill the
> data into disk and no need to read the whole partition into memory. But if
> you uses SparkSQL, it depends on how SparkSQL uses this operators.
>
>
>
> CC @hao if he has some thoughts on it.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 3:28 PM
> *To:* Shao, Saisai
>
>
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> Hi Saisai,
>
>
>
> What's your suggested settings on monitoring shuffle? I've
> enabled -XX:+PrintGCDetails -XX:+PrintGCTimeStamps for GC logging.
>
>
>
> I found SPARK-3461 (Support external groupByKey using
> repartitionAndSortWithinPartitions) want to make groupByKey using external
> storage. It's still open status. Does that mean now
> groupByKey/cogroup/join(implemented as cogroup + flatmap) will still read
> the group as a whole during consuming?
>
>
>
> How can I deal with the key skewness in joins? Is there a skew-join
> implementation?
>
>
>
>
>
> Jianshi
>
>
>
>
>
>
>
> On Thu, Mar 5, 2015 at 2:44 PM, Shao, Saisai <saisai.s...@intel.com>
> wrote:
>
>  Hi Jianshi,
>
>
>
> From my understanding, it may not be the problem of NIO or Netty, looking
> at your stack trace, the OOM is occurred in EAOM(ExternalAppendOnlyMap),
> theoretically EAOM can spill the data into disk if memory is not enough,
> but there might some issues when join key is skewed or key number is
> smaller, so you will meet OOM.
>
>
>
> Maybe you could monitor each stage or task’s shuffle and GC status also
> system status to identify the problem.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Jianshi Huang [mailto:jianshi.hu...@gmail.com]
> *Sent:* Thursday, March 5, 2015 2:32 PM
> *To:* Aaron Davidson
> *Cc:* user
> *Subject:* Re: Having lots of FetchFailedException in join
>
>
>
> One really interesting is that when I'm using the
> netty-based spark.shuffle.blockTransferService, there's no OOM error
> messages (java.lang.OutOfMemoryError: Java heap space).
>
>
>
> Any idea why it's not here?
>
>
>
> I'm using Spark 1.2.1.
>
>
>
> Jianshi
>
>
>
> On Thu, Mar 5, 2015 at 1:56 PM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  I changed spark.shuffle.blockTransferService to nio and now I'm getting
> OOM errors, I'm doing a big join operation.
>
>
>
>
>
> 15/03/04 19:04:07 ERROR Executor: Exception in task 107.0 in stage 2.0
> (TID 6207)
>
> java.lang.OutOfMemoryError: Java heap space
>
>         at
> org.apache.spark.util.collection.CompactBuffer.growToSize(CompactBuffer.scala:142)
>
>         at
> org.apache.spark.util.collection.CompactBuffer.$plus$eq(CompactBuffer.scala:74)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:179)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$5.apply(CoGroupedRDD.scala:178)
>
>         at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:122)
>
>         at
> org.apache.spark.util.collection.ExternalAppendOnlyMap$$anonfun$2.apply(ExternalAppendOnlyMap.scala:121)
>
>         at
> org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:138)
>
>         at
> org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
>
>         at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:130)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
>
>
> Is join/cogroup still memory bound?
>
>
>
>
>
> Jianshi
>
>
>
>
>
>
>
> On Wed, Mar 4, 2015 at 2:11 PM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  Hmm... ok, previous errors are still block fetch errors.
>
>
>
> 15/03/03 10:22:40 ERROR RetryingBlockFetcher: Exception while beginning
> fetch of 11 outstanding blocks
>
> java.io.IOException: Failed to connect to host-xxxx/xxxx:55597
>
>         at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>
>         at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>
>         at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>
>         at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>
>         at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
>
>         at
> org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks(NettyBlockTransferService.scala:87)
>
>         at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:149)
>
>         at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:289)
>
>         at
> org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:53)
>
>         at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>
>         at
> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>
>         at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
>         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>
>         at
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:160)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>
>         at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>
>         at
> org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:159)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at
> org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at
> org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31)
>
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>
>         at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>
>         at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:197)
>
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>         at java.lang.Thread.run(Thread.java:724)
>
> Caused by: java.net.ConnectException: Connection refused:
> lvshdc5dn0518.lvs.paypal.com/10.196.244.48:55597
>
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>
>         at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:735)
>
>         at
> io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:208)
>
>         at
> io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:287)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>
>
>
> And I checked executor on container host-xxxx, everything is good.
>
>
>
> Jianshi
>
>
>
>
>
> On Wed, Mar 4, 2015 at 12:28 PM, Aaron Davidson <ilike...@gmail.com>
> wrote:
>
>  Drat! That doesn't help. Could you scan from the top to see if there
> were any fatal errors preceding these? Sometimes a OOM will cause this type
> of issue further down.
>
>
>
> On Tue, Mar 3, 2015 at 8:16 PM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  The failed executor has the following error messages. Any hints?
>
>
>
> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
> RpcHandler#receive() on RPC id 5711039715419258699
>
> java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>         at java.io.FileInputStream.open(Native Method)
>
>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>
>         at
> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>
>         at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>
>         at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>
>         at java.lang.Thread.run(Thread.java:724)
>
> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
> RpcHandler#receive() on RPC id 7941985280808455530
>
> java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>         at java.io.FileInputStream.open(Native Method)
>
>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>
>         at
> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>
>         at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>
>         at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>
>         at java.lang.Thread.run(Thread.java:724)
>
> 15/03/03 10:22:41 ERROR TransportRequestHandler: Error while invoking
> RpcHandler#receive() on RPC id 5413737659722448543
>
> java.io.FileNotFoundException:
> /hadoop03/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-88ee/17/shuffle_0_1074_0.index
> (No such file or directory)
>
>         at java.io.FileInputStream.open(Native Method)
>
>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>
>         at
> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>
>         at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
>
>         at
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
>
>         at
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
>
>         at
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
>
>         at
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
>
>         at
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
>
>         at
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>
>         at
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>
>         at
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
>
>         at java.lang.Thread.run(Thread.java:724)
>
>
>
>
>
> Jianshi
>
>
>
> On Wed, Mar 4, 2015 at 3:25 AM, Aaron Davidson <ilike...@gmail.com> wrote:
>
>  "Failed to connect" implies that the executor at that host died, please
> check its logs as well.
>
>
>
> On Tue, Mar 3, 2015 at 11:03 AM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  Sorry that I forgot the subject.
>
>
>
> And in the driver, I got many FetchFailedException. The error messages are
>
>
>
> 15/03/03 10:34:32 WARN TaskSetManager: Lost task 31.0 in stage 2.2 (TID
> 7943, xxxx): FetchFailed(BlockManagerId(86, xxxx, 43070), shuffleId=0,
> mapId=24, reduceId=1220, message=
>
> org.apache.spark.shuffle.FetchFailedException: Failed to connect to
> xxxx/xxxx:43070
>
>         at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
>
>         at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>
>         at
> org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
>
>
>
>
>
> Jianshi
>
>
>
> On Wed, Mar 4, 2015 at 2:55 AM, Jianshi Huang <jianshi.hu...@gmail.com>
> wrote:
>
>  Hi,
>
>
>
> I got this error message:
>
>
>
> 15/03/03 10:22:41 ERROR OneForOneBlockFetcher: Failed while starting block
> fetches
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>         at java.io.FileInputStream.open(Native Method)
>
>         at java.io.FileInputStream.<init>(FileInputStream.java:146)
>
>         at
> org.apache.spark.shuffle.IndexShuffleBlockManager.getBlockData(IndexShuffleBlockManager.scala:109)
>
>         at
> org.apache.spark.storage.BlockManager.getBlockData(BlockManager.scala:305)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer$$anonfun$2.apply(NettyBlockRpcServer.scala:57)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>
>         at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
>         at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>
>         at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>
>         at
> org.apache.spark.network.netty.NettyBlockRpcServer.receive(NettyBlockRpcServer.scala:57)
>
>
>
>
>
> And then for the same index file and executor, I got the following errors
> multiple times
>
>
>
> 15/03/03 10:22:41 ERROR ShuffleBlockFetcherIterator: Failed to get
> block(s) from host-xxxx:39534
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>
>
> 15/03/03 10:22:41 ERROR RetryingBlockFetcher: Failed to fetch block
> shuffle_0_13_1228, and will not retry (0 retries)
>
> java.lang.RuntimeException: java.io.FileNotFoundException:
> /hadoop01/scratch/local/usercache/jianshuang/appcache/application_1421268539738_202330/spark-local-20150303100549-fc3b/02/shuffle_0_1458_0.index
> (No such file or directory)
>
>
>
> ...
>
> Caused by: java.net.ConnectException: Connection refused: host-xxxx....
>
>
>
>
>
> What's the problem?
>
>
>
> BTW, I'm using Spark 1.2.1-SNAPSHOT I built around Dec. 20. Is there any
> bug fixes related to shuffle block fetching or index files after that?
>
>
>
>
>  Thanks,
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>
>
>
>
>
> --
>
> Jianshi Huang
>
> LinkedIn: jianshi
> Twitter: @jshuang
> Github & Blog: http://huangjs.github.com/
>



-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/

Reply via email to