Re: Massive fetch fails, io errors in TransportRequestHandler

2017-09-28 Thread Vadim Semenov
Looks like there's slowness in sending shuffle files, maybe one executor
get overwhelmed with all the other executors trying to pull data?
Try lifting `spark.network.timeout` further, we ourselves had to push it to
600s from the default 120s

On Thu, Sep 28, 2017 at 10:19 AM, Ilya Karpov 
wrote:

> Hi,
> I see strange behaviour in my job, and can’t understand what is wrong:
> the stage that uses shuffle data as an input job fails number of times
> because of org.apache.spark.shuffle.FetchFailedException seen in spark UI:
> FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1,
> mapId=50192, reduceId=12698, message=
> FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1,
> mapId=3, reduceId=12699, message=
>
> Digging in logs I found a scenario of task failure:
> 1. some shuffle-server-X-Y (important note: external shuffle service is
> OFF) report 'Broken pipe’ at 2017-09-26T05:40:26.484Z
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> at sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(
> FileChannelImpl.java:428)
> at sun.nio.ch.FileChannelImpl.transferToDirectly(
> FileChannelImpl.java:493)
> at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:608)
> at io.netty.channel.DefaultFileRegion.transferTo(
> DefaultFileRegion.java:139)
> at org.apache.spark.network.protocol.MessageWithHeader.
> transferTo(MessageWithHeader.java:121)
> at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(
> NioSocketChannel.java:287)
> at io.netty.channel.nio.AbstractNioByteChannel.doWrite(
> AbstractNioByteChannel.java:237)
> at io.netty.channel.socket.nio.NioSocketChannel.doWrite(
> NioSocketChannel.java:314)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(
> AbstractChannel.java:802)
> at io.netty.channel.nio.AbstractNioChannel$
> AbstractNioUnsafe.flush0(AbstractNioChannel.java:313)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(
> AbstractChannel.java:770)
>
> and "chunk send" errors:
> Error sending result 
> ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=65546478185,
> chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=
> /data/1/yarn/nm/usercache/hdfs/appcache/application_
> 1505206571245_2989/blockmgr-9be47304-ffe2-443a-bb10-
> 33a89928f5b9/04/shuffle_1_3_0.data, offset=40858881, length=3208}} to
> /someClientIP:somePort; closing connection
> with exceptions:
> java.nio.channels.ClosedChannelException
> at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown
> Source)
>
> 2. then client of this shuffle-server complains with:
> Connection to some-hostname/someIP:port has been quiet for 24 ms while
> there are outstanding requests. Assuming connection is dead; please adjust
> spark.network.timeout if this is wrong.
> and then
> Still have 3386 requests outstanding when connection from
> some-hostname/someIP:11431 is closed
> and then
> java.io.IOException: Connection from 
> shuffleServerHostname/shuffleServerIP:port
> closed
> at org.apache.spark.network.client.TransportResponseHandler.
> channelInactive(TransportResponseHandler.java:146)
> at org.apache.spark.network.server.TransportChannelHandler.
> channelInactive(TransportChannelHandler.java:108)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
> at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
> at io.netty.handler.timeout.IdleStateHandler.channelInactive(
> IdleStateHandler.java:278)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
> at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:241)
> at io.netty.channel.AbstractChannelHandlerContext.
> invokeChannelInactive(AbstractChannelHandlerContext.java:227)
> at io.netty.channel.AbstractChannelHandlerContext.
> fireChannelInactive(AbstractChannelHandlerContext.java:220)
> at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(
> ChannelInboundHandlerAdapter.java:75)
>
> this fails tasks and stage for several times. And finally job 

Massive fetch fails, io errors in TransportRequestHandler

2017-09-28 Thread Ilya Karpov
Hi, 
I see strange behaviour in my job, and can’t understand what is wrong:
the stage that uses shuffle data as an input job fails number of times because 
of org.apache.spark.shuffle.FetchFailedException seen in spark UI:
FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1, mapId=50192, 
reduceId=12698, message=
FetchFailed(BlockManagerId(8, hostname, 11431, None), shuffleId=1, mapId=3, 
reduceId=12699, message=

Digging in logs I found a scenario of task failure:
1. some shuffle-server-X-Y (important note: external shuffle service is OFF) 
report 'Broken pipe’ at 2017-09-26T05:40:26.484Z
java.io.IOException: Broken pipe
at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at 
sun.nio.ch.FileChannelImpl.transferToDirectlyInternal(FileChannelImpl.java:428)
at 
sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:493)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:608)
at 
io.netty.channel.DefaultFileRegion.transferTo(DefaultFileRegion.java:139)
at 
org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:121)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:287)
at 
io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:237)
at 
io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:314)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:802)
at 
io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:313)
at 
io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:770)

and "chunk send" errors:
Error sending result 
ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=65546478185, 
chunkIndex=0}, 
buffer=FileSegmentManagedBuffer{file=/data/1/yarn/nm/usercache/hdfs/appcache/application_1505206571245_2989/blockmgr-9be47304-ffe2-443a-bb10-33a89928f5b9/04/shuffle_1_3_0.data,
 offset=40858881, length=3208}} to /someClientIP:somePort; closing connection
with exceptions:
java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown 
Source)

2. then client of this shuffle-server complains with:
Connection to some-hostname/someIP:port has been quiet for 24 ms while 
there are outstanding requests. Assuming connection is dead; please adjust 
spark.network.timeout if this is wrong.
and then
Still have 3386 requests outstanding when connection from 
some-hostname/someIP:11431 is closed
and then
java.io.IOException: Connection from shuffleServerHostname/shuffleServerIP:port 
closed
at 
org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:146)
at 
org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:108)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:278)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:241)
at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:227)
at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:220)
at 
io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75)

this fails tasks and stage for several times. And finally job is failed. 

I also see retry messages: Failed to fetch block shuffle_1_3_12682, and will 
not retry (5 retries).

Such a failures occur on different hosts.

I can’t say that we were experiencing any network connectivity issues, node 
failures or smth similar. Seems that connection was dropped by some spark 
internal mechanisms.

Any guesses what was the reason is appreciated!

Spark 2.2.0

Spark config:
--num-executors 39
\
--conf spark.dynamicAllocation.enabled=f